From 0c0a4ae0b3613ced8d6c20ba2627161965c9eb33 Mon Sep 17 00:00:00 2001 From: Huxing Zhang Date: Tue, 12 Mar 2019 15:44:39 +0800 Subject: [PATCH 01/13] Minor refactor, no functinoal change. --- .../etcd/jetcd/JEtcdClientWrapper.java | 210 ++++++++---------- 1 file changed, 91 insertions(+), 119 deletions(-) diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java index 8515b617ae96..787deab00e7a 100644 --- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java @@ -87,7 +87,8 @@ public class JEtcdClientWrapper { private RuntimeException failed; private final ScheduledFuture retryFuture; - private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Etcd3RegistryKeepAliveFailedRetryTimer", true)); + private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, + new NamedThreadFactory("Etcd3RegistryKeepAliveFailedRetryTimer", true)); private final Set failedRegistered = new ConcurrentHashSet(); @@ -117,13 +118,11 @@ public JEtcdClientWrapper(URL url) { this.failed = new IllegalStateException("Etcd3 registry is not connected yet, url:" + url); int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); - this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() { - public void run() { - try { - retry(); - } catch (Throwable t) { - logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t); - } + this.retryFuture = retryExecutor.scheduleWithFixedDelay(() -> { + try { + retry(); + } catch (Throwable t) { + logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t); } }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS); } @@ -170,29 +169,26 @@ public ManagedChannel getChannel() { public List getChildren(String path) { try { return RetryLoops.invokeWithRetry( - new Callable>() { - @Override - public List call() throws Exception { - requiredNotNull(client, failed); - int len = path.length(); - return client.getKVClient() - .get(ByteSequence.from(path, UTF_8), - GetOption.newBuilder().withPrefix(ByteSequence.from(path, UTF_8)).build()) - .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS) - .getKvs().stream().parallel() - .filter(pair -> { - String key = pair.getKey().toString(UTF_8); - int index = len, count = 0; - if (key.length() > len) { - for (; (index = key.indexOf(Constants.PATH_SEPARATOR, index)) != -1; ++index) { - if (count++ > 1) break; - } + () -> { + requiredNotNull(client, failed); + int len = path.length(); + return client.getKVClient() + .get(ByteSequence.from(path, UTF_8), + GetOption.newBuilder().withPrefix(ByteSequence.from(path, UTF_8)).build()) + .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS) + .getKvs().stream().parallel() + .filter(pair -> { + String key = pair.getKey().toString(UTF_8); + int index = len, count = 0; + if (key.length() > len) { + for (; (index = key.indexOf(Constants.PATH_SEPARATOR, index)) != -1; ++index) { + if (count++ > 1) break; } - return count == 1; - }) - .map(pair -> pair.getKey().toString(UTF_8)) - .collect(toList()); - } + } + return count == 1; + }) + .map(pair -> pair.getKey().toString(UTF_8)) + .collect(toList()); }, retryPolicy); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); @@ -207,15 +203,12 @@ public boolean isConnected() { public long createLease(long second) { try { return RetryLoops.invokeWithRetry( - new Callable() { - @Override - public Long call() throws Exception { - requiredNotNull(client, failed); - return client.getLeaseClient() - .grant(second) - .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS) - .getID(); - } + () -> { + requiredNotNull(client, failed); + return client.getLeaseClient() + .grant(second) + .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS) + .getID(); }, retryPolicy); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); @@ -225,15 +218,12 @@ public Long call() throws Exception { public void revokeLease(long lease) { try { RetryLoops.invokeWithRetry( - new Callable() { - @Override - public Void call() throws Exception { - requiredNotNull(client, failed); - client.getLeaseClient() - .revoke(lease) - .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS); - return null; - } + (Callable) () -> { + requiredNotNull(client, failed); + client.getLeaseClient() + .revoke(lease) + .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS); + return null; }, retryPolicy); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); @@ -260,15 +250,12 @@ public long createLease(long ttl, long timeout, TimeUnit unit) public boolean checkExists(String path) { try { return RetryLoops.invokeWithRetry( - new Callable() { - @Override - public Boolean call() throws Exception { - requiredNotNull(client, failed); - return client.getKVClient() - .get(ByteSequence.from(path, UTF_8), GetOption.newBuilder().withCountOnly(true).build()) - .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS) - .getCount() > 0; - } + () -> { + requiredNotNull(client, failed); + return client.getKVClient() + .get(ByteSequence.from(path, UTF_8), GetOption.newBuilder().withCountOnly(true).build()) + .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS) + .getCount() > 0; }, retryPolicy); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); @@ -281,17 +268,14 @@ public Boolean call() throws Exception { protected Long find(String path) { try { return RetryLoops.invokeWithRetry( - new Callable() { - @Override - public Long call() throws Exception { - requiredNotNull(client, failed); - return client.getKVClient() - .get(ByteSequence.from(path, UTF_8)) - .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS) - .getKvs().stream() - .mapToLong(keyValue -> Long.valueOf(keyValue.getValue().toString(UTF_8))) - .findFirst().getAsLong(); - } + () -> { + requiredNotNull(client, failed); + return client.getKVClient() + .get(ByteSequence.from(path, UTF_8)) + .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS) + .getKvs().stream() + .mapToLong(keyValue -> Long.valueOf(keyValue.getValue().toString(UTF_8))) + .findFirst().getAsLong(); }, retryPolicy); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); @@ -301,16 +285,13 @@ public Long call() throws Exception { public void createPersistent(String path) { try { RetryLoops.invokeWithRetry( - new Callable() { - @Override - public Void call() throws Exception { - requiredNotNull(client, failed); - client.getKVClient() - .put(ByteSequence.from(path, UTF_8), - ByteSequence.from(String.valueOf(path.hashCode()), UTF_8)) - .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS); - return null; - } + (Callable) () -> { + requiredNotNull(client, failed); + client.getKVClient() + .put(ByteSequence.from(path, UTF_8), + ByteSequence.from(String.valueOf(path.hashCode()), UTF_8)) + .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS); + return null; }, retryPolicy); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); @@ -328,21 +309,18 @@ public Void call() throws Exception { public long createEphemeral(String path) { try { return RetryLoops.invokeWithRetry( - new Callable() { - @Override - public Long call() throws Exception { - requiredNotNull(client, failed); - - registeredPaths.add(path); - keepAlive(); - final long leaseId = globalLeaseId; - client.getKVClient() - .put(ByteSequence.from(path, UTF_8) - , ByteSequence.from(String.valueOf(leaseId), UTF_8) - , PutOption.newBuilder().withLeaseId(leaseId).build()) - .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS); - return leaseId; - } + () -> { + requiredNotNull(client, failed); + + registeredPaths.add(path); + keepAlive(); + final long leaseId = globalLeaseId; + client.getKVClient() + .put(ByteSequence.from(path, UTF_8) + , ByteSequence.from(String.valueOf(leaseId), UTF_8) + , PutOption.newBuilder().withLeaseId(leaseId).build()) + .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS); + return leaseId; }, retryPolicy); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); @@ -354,6 +332,7 @@ public void keepAlive(long lease) { this.keepAlive(lease, null); } + @SuppressWarnings("unchecked") private void keepAlive(long lease, Consumer onFailed) { final StreamObserver observer = new Observers.Builder() .onError((e) -> { @@ -471,16 +450,13 @@ private void recovery() { public void delete(String path) { try { RetryLoops.invokeWithRetry( - new Callable() { - @Override - public Void call() throws Exception { - requiredNotNull(client, failed); - client.getKVClient() - .delete(ByteSequence.from(path, UTF_8)) - .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS); - registeredPaths.remove(path); - return null; - } + (Callable) () -> { + requiredNotNull(client, failed); + client.getKVClient() + .delete(ByteSequence.from(path, UTF_8)) + .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS); + registeredPaths.remove(path); + return null; }, retryPolicy); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); @@ -527,26 +503,22 @@ public void start() { } try { - this.future = reconnectNotify.scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - boolean connected = isConnected(); - if (connectState != connected) { - int notifyState = connected ? StateListener.CONNECTED : StateListener.DISCONNECTED; - if (connectionStateListener != null) { - try { - if (connected) { - clearKeepAlive(); - } - connectionStateListener.stateChanged(getClient(), notifyState); - } finally { - cancelKeepAlive = false; + this.future = reconnectNotify.scheduleWithFixedDelay(() -> { + boolean connected = isConnected(); + if (connectState != connected) { + int notifyState = connected ? StateListener.CONNECTED : StateListener.DISCONNECTED; + if (connectionStateListener != null) { + try { + if (connected) { + clearKeepAlive(); } + connectionStateListener.stateChanged(getClient(), notifyState); + } finally { + cancelKeepAlive = false; } - connectState = connected; } + connectState = connected; } - }, Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD, Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD, TimeUnit.MILLISECONDS); } catch (Throwable t) { logger.error("monitor reconnect status failed.", t); From c9c428b2d72433c761c48ce9a937fd5b4c28be64 Mon Sep 17 00:00:00 2001 From: Huxing Zhang Date: Tue, 12 Mar 2019 15:50:23 +0800 Subject: [PATCH 02/13] Separate ConnectionStateListener --- .../etcd/jetcd/ConnectionStateListener.java | 31 +++++++++++++++++++ .../etcd/jetcd/JEtcdClientWrapper.java | 10 ------ 2 files changed, 31 insertions(+), 10 deletions(-) create mode 100644 dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/ConnectionStateListener.java diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/ConnectionStateListener.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/ConnectionStateListener.java new file mode 100644 index 000000000000..788aa401e9be --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/ConnectionStateListener.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.remoting.etcd.jetcd; + +import io.etcd.jetcd.Client; + +public interface ConnectionStateListener { + + /** + * Called when there is a state change in the connection + * + * @param client the client + * @param newState the new state + */ + void stateChanged(Client client, int newState); +} \ No newline at end of file diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java index 787deab00e7a..08bbfb87e9f8 100644 --- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java @@ -651,16 +651,6 @@ private void retry() { } } - public interface ConnectionStateListener { - /** - * Called when there is a state change in the connection - * - * @param client the client - * @param newState the new state - */ - public void stateChanged(Client client, int newState); - } - /** * default request timeout */ From d2acfcf4b90eea2a84fa367d1999f89848ce1b4f Mon Sep 17 00:00:00 2001 From: Huxing Zhang Date: Tue, 12 Mar 2019 15:53:08 +0800 Subject: [PATCH 03/13] Simplify code --- .../dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java index 08bbfb87e9f8..a19e0161ed39 100644 --- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java @@ -470,13 +470,13 @@ public void delete(String path) { public String[] endPoints(String backupAddress) { String[] endpoints = backupAddress.split(Constants.COMMA_SEPARATOR); - List addressess = Arrays.stream(endpoints) - .map(address -> address.indexOf(Constants.HTTP_SUBFIX_KEY) > -1 + List addresses = Arrays.stream(endpoints) + .map(address -> address.contains(Constants.HTTP_SUBFIX_KEY) ? address : Constants.HTTP_KEY + address) .collect(toList()); - Collections.shuffle(addressess); - return addressess.toArray(new String[0]); + Collections.shuffle(addresses); + return addresses.toArray(new String[0]); } /** From 4d610026b30c0eff908b31840d944cb6d9e6e36a Mon Sep 17 00:00:00 2001 From: Huxing Zhang Date: Tue, 12 Mar 2019 15:54:38 +0800 Subject: [PATCH 04/13] Fix typo --- .../remoting/etcd/jetcd/JEtcdClientWrapper.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java index a19e0161ed39..41d2c2d5daaf 100644 --- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java @@ -129,15 +129,15 @@ public JEtcdClientWrapper(URL url) { private Client prepareClient(URL url) { - int maxInboudSize = DEFAULT_INBOUT_SIZE; - if (StringUtils.isNotEmpty(System.getProperty(GRPC_MAX_INBOUD_SIZE_KEY))) { - maxInboudSize = Integer.valueOf(System.getProperty(GRPC_MAX_INBOUD_SIZE_KEY)); + int maxInboundSize = DEFAULT_INBOUND_SIZE; + if (StringUtils.isNotEmpty(System.getProperty(GRPC_MAX_INBOUND_SIZE_KEY))) { + maxInboundSize = Integer.valueOf(System.getProperty(GRPC_MAX_INBOUND_SIZE_KEY)); } ClientBuilder clientBuilder = Client.builder() .loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance()) .endpoints(endPoints(url.getBackupAddress())) - .maxInboundMessageSize(maxInboudSize); + .maxInboundMessageSize(maxInboundSize); return clientBuilder.build(); } @@ -656,9 +656,9 @@ private void retry() { */ public static final long DEFAULT_REQUEST_TIMEOUT = obtainRequestTimeout(); - public static final int DEFAULT_INBOUT_SIZE = 100 * 1024 * 1024; + public static final int DEFAULT_INBOUND_SIZE = 100 * 1024 * 1024; - public static final String GRPC_MAX_INBOUD_SIZE_KEY = "grpc.max.inbound.size"; + public static final String GRPC_MAX_INBOUND_SIZE_KEY = "grpc.max.inbound.size"; public static final String ETCD_REQUEST_TIMEOUT_KEY = "etcd.request.timeout"; From fcbd289baee55335720d159a7cb5b35aec76a18a Mon Sep 17 00:00:00 2001 From: Huxing Zhang Date: Tue, 12 Mar 2019 16:12:32 +0800 Subject: [PATCH 05/13] Support get external config from etcd config center --- dubbo-bom/pom.xml | 5 ++ .../dubbo-configcenter-etcd/pom.xml | 46 +++++++++++ .../etcd/EtcdDynamicConfiguration.java | 76 +++++++++++++++++++ .../etcd/EtcdDynamicConfigurationFactory.java | 33 ++++++++ ...o.configcenter.DynamicConfigurationFactory | 1 + dubbo-configcenter/pom.xml | 1 + .../dubbo/remoting/etcd/EtcdClient.java | 8 ++ .../remoting/etcd/jetcd/JEtcdClient.java | 12 +-- .../etcd/jetcd/JEtcdClientWrapper.java | 21 +++++ .../etcd/support/AbstractEtcdClient.java | 2 +- 10 files changed, 198 insertions(+), 7 deletions(-) create mode 100644 dubbo-configcenter/dubbo-configcenter-etcd/pom.xml create mode 100644 dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java create mode 100644 dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationFactory.java create mode 100644 dubbo-configcenter/dubbo-configcenter-etcd/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.configcenter.DynamicConfigurationFactory diff --git a/dubbo-bom/pom.xml b/dubbo-bom/pom.xml index d04495a47ce8..47875b83ffbd 100644 --- a/dubbo-bom/pom.xml +++ b/dubbo-bom/pom.xml @@ -343,6 +343,11 @@ dubbo-configcenter-consul ${project.version} + + org.apache.dubbo + dubbo-configcenter-etcd + ${project.version} + org.apache.dubbo dubbo-metadata-definition diff --git a/dubbo-configcenter/dubbo-configcenter-etcd/pom.xml b/dubbo-configcenter/dubbo-configcenter-etcd/pom.xml new file mode 100644 index 000000000000..60efc8e4bb9a --- /dev/null +++ b/dubbo-configcenter/dubbo-configcenter-etcd/pom.xml @@ -0,0 +1,46 @@ + + + + + + dubbo-configcenter + org.apache.dubbo + 2.7.1-SNAPSHOT + + 4.0.0 + + dubbo-configcenter-etcd + jar + ${project.artifactId} + The etcd implementation of the config-center api + + + + org.apache.dubbo + dubbo-configcenter-api + ${project.parent.version} + + + org.apache.dubbo + dubbo-remoting-etcd3 + ${project.parent.version} + + + \ No newline at end of file diff --git a/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java new file mode 100644 index 000000000000..9e66617d1a61 --- /dev/null +++ b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.configcenter.support.etcd; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.utils.StringUtils; +import org.apache.dubbo.configcenter.ConfigurationListener; +import org.apache.dubbo.configcenter.DynamicConfiguration; +import org.apache.dubbo.remoting.etcd.EtcdClient; +import org.apache.dubbo.remoting.etcd.jetcd.JEtcdClient; + +import static org.apache.dubbo.common.Constants.CONFIG_NAMESPACE_KEY; +import static org.apache.dubbo.common.Constants.PATH_SEPARATOR; + +/** + * The etcd implementation of {@link DynamicConfiguration} + */ +public class EtcdDynamicConfiguration implements DynamicConfiguration { + + /** + * The final root path would be: /$NAME_SPACE/config + */ + private String rootPath; + + /** + * The etcd client + */ + private final EtcdClient etcdClient; + + EtcdDynamicConfiguration(URL url) { + rootPath = "/" + url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + "/config"; + etcdClient = new JEtcdClient(url); + } + + @Override + public void addListener(String key, String group, ConfigurationListener listener) { + + } + + @Override + public void removeListener(String key, String group, ConfigurationListener listener) { + + } + + // TODO Abstract the logic into super class + @Override + public String getConfig(String key, String group, long timeout) throws IllegalStateException { + if (StringUtils.isNotEmpty(group)) { + key = group + PATH_SEPARATOR + key; + } else { + int i = key.lastIndexOf("."); + key = key.substring(0, i) + PATH_SEPARATOR + key.substring(i + 1); + } + return (String) getInternalProperty(rootPath + PATH_SEPARATOR + key); + } + + @Override + public Object getInternalProperty(String key) { + return etcdClient.getKVValue(key); + } +} diff --git a/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationFactory.java b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationFactory.java new file mode 100644 index 000000000000..02e91a62db7d --- /dev/null +++ b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.configcenter.support.etcd; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.configcenter.AbstractDynamicConfigurationFactory; +import org.apache.dubbo.configcenter.DynamicConfiguration; + +/** + * The etcd implementation of {@link AbstractDynamicConfigurationFactory} + */ +public class EtcdDynamicConfigurationFactory extends AbstractDynamicConfigurationFactory { + + @Override + protected DynamicConfiguration createDynamicConfiguration(URL url) { + return new EtcdDynamicConfiguration(url); + } +} diff --git a/dubbo-configcenter/dubbo-configcenter-etcd/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.configcenter.DynamicConfigurationFactory b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.configcenter.DynamicConfigurationFactory new file mode 100644 index 000000000000..d84b1ae0e1a9 --- /dev/null +++ b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.configcenter.DynamicConfigurationFactory @@ -0,0 +1 @@ +etcd=org.apache.dubbo.configcenter.support.etcd.EtcdDynamicConfigurationFactory \ No newline at end of file diff --git a/dubbo-configcenter/pom.xml b/dubbo-configcenter/pom.xml index fa703be6864b..92f727d23213 100644 --- a/dubbo-configcenter/pom.xml +++ b/dubbo-configcenter/pom.xml @@ -34,5 +34,6 @@ dubbo-configcenter-zookeeper dubbo-configcenter-apollo dubbo-configcenter-consul + dubbo-configcenter-etcd diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java index b1e765d3416e..698b3fc12bd7 100644 --- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java @@ -164,4 +164,12 @@ public long createLease(long ttl, long timeout, TimeUnit unit) */ void revokeLease(long lease); + + /** + * Get the value of the specified key. + * @param key the specified key + * @return null if the value is not found + */ + String getKVValue(String key); + } diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java index d07cad064055..b851759fff15 100644 --- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java @@ -185,6 +185,11 @@ public void doClose() { } } + @Override + public String getKVValue(String key) { + return clientWrapper.getKVValue(key); + } + public class EtcdWatcher implements StreamObserver { protected WatchGrpc.WatchStub watchStub; @@ -233,12 +238,7 @@ public void onNext(WatchResponse response) { } } if (modified > 0) { - notifyExecutor.execute(new Runnable() { - @Override - public void run() { - listener.childChanged(path, new ArrayList<>(urls)); - } - }); + notifyExecutor.execute(() -> listener.childChanged(path, new ArrayList<>(urls))); } } diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java index 41d2c2d5daaf..ab65383f44ed 100644 --- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java @@ -31,9 +31,11 @@ import io.etcd.jetcd.Client; import io.etcd.jetcd.ClientBuilder; import io.etcd.jetcd.CloseableClient; +import io.etcd.jetcd.KeyValue; import io.etcd.jetcd.Observers; import io.etcd.jetcd.common.exception.ErrorCode; import io.etcd.jetcd.common.exception.EtcdException; +import io.etcd.jetcd.kv.GetResponse; import io.etcd.jetcd.lease.LeaseKeepAliveResponse; import io.etcd.jetcd.options.GetOption; import io.etcd.jetcd.options.PutOption; @@ -610,6 +612,25 @@ public static void requiredNotNull(Object obj, RuntimeException exeception) { } } + public String getKVValue(String key) { + if (null == key) { + return null; + } + + CompletableFuture responseFuture = this.client.getKVClient().get(ByteSequence.from(key, UTF_8)); + + try { + List result = responseFuture.get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS).getKvs(); + if (!result.isEmpty()) { + return result.get(0).getValue().toString(UTF_8); + } + } catch (Exception e) { + // ignore + } + + return null; + } + private void retry() { if (!failedRegistered.isEmpty()) { Set failed = new HashSet(failedRegistered); diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java index 31752bffe8f6..17255db76f9b 100644 --- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java @@ -99,7 +99,7 @@ public Set getSessionListeners() { public List addChildListener(String path, final ChildListener listener) { ConcurrentMap listeners = childListeners.get(path); if (listeners == null) { - childListeners.putIfAbsent(path, new ConcurrentHashMap()); + childListeners.putIfAbsent(path, new ConcurrentHashMap<>()); listeners = childListeners.get(path); } WatcherListener targetListener = listeners.get(listener); From 945fbd3e5d4aad3fdfcd2ccad493552e42a6dcf0 Mon Sep 17 00:00:00 2001 From: Huxing Zhang Date: Wed, 13 Mar 2019 10:44:48 +0800 Subject: [PATCH 06/13] Polish diamond operator --- .../apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java index 17255db76f9b..5fecd14d395e 100644 --- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java @@ -57,7 +57,7 @@ public abstract class AbstractEtcdClient implements EtcdClient private final Set stateListeners = new ConcurrentHashSet<>(); - private final ConcurrentMap> childListeners = new ConcurrentHashMap>(); + private final ConcurrentMap> childListeners = new ConcurrentHashMap<>(); private final List categroies = Arrays.asList(Constants.PROVIDERS_CATEGORY , Constants.CONSUMERS_CATEGORY , Constants.ROUTERS_CATEGORY From 3133d8666126c038103ceeac6cac3d96696b51ec Mon Sep 17 00:00:00 2001 From: Huxing Zhang Date: Wed, 13 Mar 2019 15:55:11 +0800 Subject: [PATCH 07/13] Initial etcd support as config center --- .../support/etcd/EtcdConfigListener.java | 59 ++++++++ .../etcd/EtcdDynamicConfiguration.java | 18 ++- .../etcd/EtcdDynamicConfigurationTest.java | 141 ++++++++++++++++++ .../dubbo/remoting/etcd/EtcdClient.java | 10 ++ .../remoting/etcd/jetcd/JEtcdClient.java | 11 ++ .../remoting/etcd/jetcd/JEtcdClientTest.java | 33 ++++ 6 files changed, 271 insertions(+), 1 deletion(-) create mode 100644 dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdConfigListener.java create mode 100644 dubbo-configcenter/dubbo-configcenter-etcd/src/test/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationTest.java diff --git a/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdConfigListener.java b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdConfigListener.java new file mode 100644 index 000000000000..af6d7a06f87b --- /dev/null +++ b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdConfigListener.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.configcenter.support.etcd; + +import io.etcd.jetcd.Watch; +import io.etcd.jetcd.watch.WatchEvent; +import io.etcd.jetcd.watch.WatchResponse; +import org.apache.dubbo.configcenter.ConfigChangeEvent; +import org.apache.dubbo.configcenter.ConfigurationListener; + +import static java.nio.charset.StandardCharsets.UTF_8; + +/** + * This class listens to Etcd changes and notify the + * {@link org.apache.dubbo.configcenter.ConfigurationListener} + */ +public class EtcdConfigListener implements Watch.Listener { + + private ConfigurationListener listener; + + public EtcdConfigListener(ConfigurationListener listener) { + this.listener = listener; + } + + @Override + public void onNext(WatchResponse response) { + for (WatchEvent etcdEvent : response.getEvents()) { + ConfigChangeEvent event = new ConfigChangeEvent( + etcdEvent.getKeyValue().getKey().toString(UTF_8), + etcdEvent.getKeyValue().getValue().toString(UTF_8)); + listener.process(event); + } + } + + @Override + public void onError(Throwable throwable) { + // ignore + } + + @Override + public void onCompleted() { + // ignore + } +} diff --git a/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java index 9e66617d1a61..ac37ae3dd5f7 100644 --- a/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java +++ b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java @@ -24,6 +24,9 @@ import org.apache.dubbo.remoting.etcd.EtcdClient; import org.apache.dubbo.remoting.etcd.jetcd.JEtcdClient; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + import static org.apache.dubbo.common.Constants.CONFIG_NAMESPACE_KEY; import static org.apache.dubbo.common.Constants.PATH_SEPARATOR; @@ -42,14 +45,21 @@ public class EtcdDynamicConfiguration implements DynamicConfiguration { */ private final EtcdClient etcdClient; + /** + * The map store the key to {@link EtcdConfigListener} mapping + */ + private final ConcurrentMap watchListenerMap; + EtcdDynamicConfiguration(URL url) { rootPath = "/" + url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + "/config"; etcdClient = new JEtcdClient(url); + watchListenerMap = new ConcurrentHashMap<>(); } @Override public void addListener(String key, String group, ConfigurationListener listener) { - + String normalizedKey = convertKey(key); + etcdClient.addWatchListener(normalizedKey, new EtcdConfigListener(listener)); } @Override @@ -73,4 +83,10 @@ public String getConfig(String key, String group, long timeout) throws IllegalSt public Object getInternalProperty(String key) { return etcdClient.getKVValue(key); } + + + private String convertKey(String key) { + int index = key.lastIndexOf('.'); + return rootPath + PATH_SEPARATOR + key.substring(0, index) + PATH_SEPARATOR + key.substring(index + 1); + } } diff --git a/dubbo-configcenter/dubbo-configcenter-etcd/src/test/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationTest.java b/dubbo-configcenter/dubbo-configcenter-etcd/src/test/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationTest.java new file mode 100644 index 000000000000..87143fdcaccd --- /dev/null +++ b/dubbo-configcenter/dubbo-configcenter-etcd/src/test/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dubbo.configcenter.support.etcd; + +import io.etcd.jetcd.ByteSequence; +import io.etcd.jetcd.Client; +import org.apache.dubbo.common.Constants; +import org.apache.dubbo.common.URL; +import org.apache.dubbo.configcenter.ConfigChangeEvent; +import org.apache.dubbo.configcenter.ConfigurationListener; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static java.nio.charset.StandardCharsets.UTF_8; + +/** + * Unit test for etcd config center support + * TODO Integrate with https://github.com/etcd-io/jetcd#launcher or using mock data. + */ +@Disabled +public class EtcdDynamicConfigurationTest { + + private static final String ENDPOINT = "http://127.0.0.1:2379"; + + private static EtcdDynamicConfiguration config; + + private static Client etcdClient; + + @Test + public void testGetConfig() { + put("/dubbo/config/org.apache.dubbo.etcd.testService/configurators", "hello"); + put("/dubbo/config/test/dubbo.properties", "aaa=bbb"); + Assertions.assertEquals("hello", config.getConfig("org.apache.dubbo.etcd.testService.configurators")); + Assertions.assertEquals("aaa=bbb", config.getConfig("dubbo.properties", "test")); + } + + @Test + public void testAddListener() throws Exception { + CountDownLatch latch = new CountDownLatch(4); + TestListener listener1 = new TestListener(latch); + TestListener listener2 = new TestListener(latch); + TestListener listener3 = new TestListener(latch); + TestListener listener4 = new TestListener(latch); + config.addListener("AService.configurators", listener1); + config.addListener("AService.configurators", listener2); + config.addListener("testapp.tagrouters", listener3); + config.addListener("testapp.tagrouters", listener4); + + put("/dubbo/config/AService/configurators", "new value1"); + Thread.sleep(200); + put("/dubbo/config/testapp/tagrouters", "new value2"); + Thread.sleep(200); + put("/dubbo/config/testapp", "new value3"); + + Thread.sleep(1000); + + Assertions.assertTrue(latch.await(5, TimeUnit.SECONDS)); + Assertions.assertEquals(1, listener1.getCount("/dubbo/config/AService/configurators")); + Assertions.assertEquals(1, listener2.getCount("/dubbo/config/AService/configurators")); + Assertions.assertEquals(1, listener3.getCount("/dubbo/config/testapp/tagrouters")); + Assertions.assertEquals(1, listener4.getCount("/dubbo/config/testapp/tagrouters")); + + Assertions.assertEquals("new value1", listener1.getValue()); + Assertions.assertEquals("new value1", listener2.getValue()); + Assertions.assertEquals("new value2", listener3.getValue()); + Assertions.assertEquals("new value2", listener4.getValue()); + } + + private class TestListener implements ConfigurationListener { + private CountDownLatch latch; + private String value; + private Map countMap = new HashMap<>(); + + public TestListener(CountDownLatch latch) { + this.latch = latch; + } + + @Override + public void process(ConfigChangeEvent event) { + Integer count = countMap.computeIfAbsent(event.getKey(), k -> 0); + countMap.put(event.getKey(), ++count); + value = event.getValue(); + latch.countDown(); + } + + public int getCount(String key) { + return countMap.get(key); + } + + public String getValue() { + return value; + } + } + + static void put(String key, String value) { + try { + etcdClient.getKVClient() + .put(ByteSequence.from(key, UTF_8), ByteSequence.from(value, UTF_8)) + .get(); + } catch (Exception e) { + System.out.println("Error put value to etcd."); + } + } + + @BeforeAll + static void setUp() { + etcdClient = Client.builder().endpoints(ENDPOINT).build(); + // timeout in 15 seconds. + URL url = URL.valueOf("etcd3://127.0.0.1:2379/org.apache.dubbo.etcd.testService") + .addParameter(Constants.SESSION_TIMEOUT_KEY, 15000); + config = new EtcdDynamicConfiguration(url); + } + + @AfterAll + static void tearDown() { + etcdClient.close(); + } +} diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java index 698b3fc12bd7..eeb5affe2121 100644 --- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java @@ -33,6 +33,7 @@ */ package org.apache.dubbo.remoting.etcd; +import io.etcd.jetcd.Watch; import org.apache.dubbo.common.URL; import java.util.List; @@ -172,4 +173,13 @@ public long createLease(long ttl, long timeout, TimeUnit unit) */ String getKVValue(String key); + + /** + * Watch the specified key with given listener. + * The listener will be notified with future update to the key + * @param key the key to watch + * @param listener the listener to notify + */ + void addWatchListener(String key, Watch.Listener listener); + } diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java index b851759fff15..58329adf3b20 100644 --- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java @@ -17,6 +17,8 @@ package org.apache.dubbo.remoting.etcd.jetcd; +import io.etcd.jetcd.Client; +import io.etcd.jetcd.Watch; import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.logger.Logger; @@ -190,6 +192,15 @@ public String getKVValue(String key) { return clientWrapper.getKVValue(key); } + @Override + public void addWatchListener(String key, Watch.Listener listener) { + Client client = clientWrapper.getClient(); + if (client == null) { + return; + } + client.getWatchClient().watch(ByteSequence.from(key, UTF_8), listener); + } + public class EtcdWatcher implements StreamObserver { protected WatchGrpc.WatchStub watchStub; diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java b/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java index 19254abeac78..dfa66ac411b8 100644 --- a/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java @@ -33,7 +33,11 @@ */ package org.apache.dubbo.remoting.etcd.jetcd; +import io.etcd.jetcd.ByteSequence; +import io.etcd.jetcd.Client; +import io.etcd.jetcd.Watch; import io.etcd.jetcd.common.exception.ClosedClientException; +import io.etcd.jetcd.watch.WatchEvent; import io.grpc.Status; import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.URL; @@ -47,6 +51,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import static java.nio.charset.StandardCharsets.UTF_8; + @Disabled public class JEtcdClientTest { @@ -75,6 +81,33 @@ public void test_watch_when_create_path() throws InterruptedException { client.delete(child); } + @Test + public void test_watch_when_modify() { + String path = "/dubbo/config/jetcd-client-unit-test/configurators"; + String endpoint = "http://127.0.0.1:2379"; + CountDownLatch latch = new CountDownLatch(1); + ByteSequence key = ByteSequence.from(path, UTF_8); + + Watch.Listener listener = Watch.listener(response -> { + for (WatchEvent event : response.getEvents()) { + Assertions.assertEquals("PUT", event.getEventType().toString()); + Assertions.assertEquals(path, event.getKeyValue().getKey().toString(UTF_8)); + Assertions.assertEquals("Hello", event.getKeyValue().getValue().toString(UTF_8)); + } + latch.countDown(); + }); + + try (Client client = Client.builder().endpoints(endpoint).build(); + Watch watch = client.getWatchClient(); + Watch.Watcher watcher = watch.watch(key, listener)) { + // try to modify the key + client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello", UTF_8)); + latch.await(); + } catch (Exception e) { + Assertions.fail(e.getMessage()); + } + } + @Test public void test_watch_when_create_wrong_path() throws InterruptedException { From 6b77b6ae8c113109a7ea55907b7d8a63fe1b8414 Mon Sep 17 00:00:00 2001 From: Huxing Zhang Date: Wed, 13 Mar 2019 15:55:52 +0800 Subject: [PATCH 08/13] Add a put interface for JEtcdClient --- .../apache/dubbo/remoting/etcd/EtcdClient.java | 9 +++++++++ .../dubbo/remoting/etcd/jetcd/JEtcdClient.java | 5 +++++ .../remoting/etcd/jetcd/JEtcdClientWrapper.java | 17 +++++++++++++++++ 3 files changed, 31 insertions(+) diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java index eeb5affe2121..75a5dfb0b483 100644 --- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java @@ -182,4 +182,13 @@ public long createLease(long ttl, long timeout, TimeUnit unit) */ void addWatchListener(String key, Watch.Listener listener); + + /** + * Put the key value pair to etcd + * @param key the specified key + * @param value the paired value + * @return true if put success + */ + boolean put(String key, String value); + } diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java index 58329adf3b20..0fbfa41e8f1b 100644 --- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java @@ -201,6 +201,11 @@ public void addWatchListener(String key, Watch.Listener listener) { client.getWatchClient().watch(ByteSequence.from(key, UTF_8), listener); } + @Override + public boolean put(String key, String value) { + return clientWrapper.put(key, value); + } + public class EtcdWatcher implements StreamObserver { protected WatchGrpc.WatchStub watchStub; diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java index ab65383f44ed..877c6cb425dd 100644 --- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java @@ -17,6 +17,7 @@ package org.apache.dubbo.remoting.etcd.jetcd; +import io.etcd.jetcd.kv.PutResponse; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; @@ -631,6 +632,22 @@ public String getKVValue(String key) { return null; } + + public boolean put(String key, String value) { + if (key == null || value == null) { + return false; + } + CompletableFuture putFuture = + this.client.getKVClient().put(ByteSequence.from(key, UTF_8), ByteSequence.from(value, UTF_8)); + try { + putFuture.get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS); + return true; + } catch (Exception e) { + // ignore + } + return false; + } + private void retry() { if (!failedRegistered.isEmpty()) { Set failed = new HashSet(failedRegistered); From 719b951f964f97c62941692718f57d13a154d3e6 Mon Sep 17 00:00:00 2001 From: Huxing Zhang Date: Fri, 15 Mar 2019 00:41:45 +0800 Subject: [PATCH 09/13] Enhanced Etcd config center support with the ability to watch and cancel watch --- .../support/etcd/EtcdConfigListener.java | 59 -------- .../etcd/EtcdDynamicConfiguration.java | 86 ++++++++++- .../dubbo/remoting/etcd/EtcdClient.java | 11 -- .../remoting/etcd/jetcd/JEtcdClient.java | 16 +-- .../etcd/jetcd/JEtcdClientWrapper.java | 4 +- .../remoting/etcd/jetcd/JEtcdClientTest.java | 135 +++++++++++++++++- 6 files changed, 221 insertions(+), 90 deletions(-) delete mode 100644 dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdConfigListener.java diff --git a/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdConfigListener.java b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdConfigListener.java deleted file mode 100644 index af6d7a06f87b..000000000000 --- a/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdConfigListener.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dubbo.configcenter.support.etcd; - -import io.etcd.jetcd.Watch; -import io.etcd.jetcd.watch.WatchEvent; -import io.etcd.jetcd.watch.WatchResponse; -import org.apache.dubbo.configcenter.ConfigChangeEvent; -import org.apache.dubbo.configcenter.ConfigurationListener; - -import static java.nio.charset.StandardCharsets.UTF_8; - -/** - * This class listens to Etcd changes and notify the - * {@link org.apache.dubbo.configcenter.ConfigurationListener} - */ -public class EtcdConfigListener implements Watch.Listener { - - private ConfigurationListener listener; - - public EtcdConfigListener(ConfigurationListener listener) { - this.listener = listener; - } - - @Override - public void onNext(WatchResponse response) { - for (WatchEvent etcdEvent : response.getEvents()) { - ConfigChangeEvent event = new ConfigChangeEvent( - etcdEvent.getKeyValue().getKey().toString(UTF_8), - etcdEvent.getKeyValue().getValue().toString(UTF_8)); - listener.process(event); - } - } - - @Override - public void onError(Throwable throwable) { - // ignore - } - - @Override - public void onCompleted() { - // ignore - } -} diff --git a/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java index ac37ae3dd5f7..67addb4dc616 100644 --- a/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java +++ b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java @@ -17,16 +17,26 @@ package org.apache.dubbo.configcenter.support.etcd; +import com.google.protobuf.ByteString; +import io.etcd.jetcd.api.Event; +import io.etcd.jetcd.api.WatchCancelRequest; +import io.etcd.jetcd.api.WatchCreateRequest; +import io.etcd.jetcd.api.WatchGrpc; +import io.etcd.jetcd.api.WatchRequest; +import io.etcd.jetcd.api.WatchResponse; +import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.utils.StringUtils; +import org.apache.dubbo.configcenter.ConfigChangeEvent; import org.apache.dubbo.configcenter.ConfigurationListener; import org.apache.dubbo.configcenter.DynamicConfiguration; -import org.apache.dubbo.remoting.etcd.EtcdClient; import org.apache.dubbo.remoting.etcd.jetcd.JEtcdClient; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.dubbo.common.Constants.CONFIG_NAMESPACE_KEY; import static org.apache.dubbo.common.Constants.PATH_SEPARATOR; @@ -43,12 +53,12 @@ public class EtcdDynamicConfiguration implements DynamicConfiguration { /** * The etcd client */ - private final EtcdClient etcdClient; + private final JEtcdClient etcdClient; /** - * The map store the key to {@link EtcdConfigListener} mapping + * The map store the key to {@link EtcdConfigWatcher} mapping */ - private final ConcurrentMap watchListenerMap; + private final ConcurrentMap watchListenerMap; EtcdDynamicConfiguration(URL url) { rootPath = "/" + url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + "/config"; @@ -58,13 +68,18 @@ public class EtcdDynamicConfiguration implements DynamicConfiguration { @Override public void addListener(String key, String group, ConfigurationListener listener) { - String normalizedKey = convertKey(key); - etcdClient.addWatchListener(normalizedKey, new EtcdConfigListener(listener)); + if (watchListenerMap.get(listener) == null) { + EtcdConfigWatcher watcher = new EtcdConfigWatcher(listener); + watchListenerMap.put(listener, watcher); + String normalizedKey = convertKey(key); + watcher.watch(normalizedKey); + } } @Override public void removeListener(String key, String group, ConfigurationListener listener) { - + EtcdConfigWatcher watcher = watchListenerMap.get(listener); + watcher.cancelWatch(); } // TODO Abstract the logic into super class @@ -89,4 +104,61 @@ private String convertKey(String key) { int index = key.lastIndexOf('.'); return rootPath + PATH_SEPARATOR + key.substring(0, index) + PATH_SEPARATOR + key.substring(index + 1); } + + public class EtcdConfigWatcher implements StreamObserver { + + private ConfigurationListener listener; + protected WatchGrpc.WatchStub watchStub; + private StreamObserver observer; + protected long watchId; + private ManagedChannel channel; + + public EtcdConfigWatcher(ConfigurationListener listener) { + this.listener = listener; + this.channel = etcdClient.getChannel(); + } + + @Override + public void onNext(WatchResponse watchResponse) { + this.watchId = watchResponse.getWatchId(); + for (Event etcdEvent : watchResponse.getEventsList()) { + ConfigChangeEvent event = new ConfigChangeEvent( + etcdEvent.getKv().getKey().toString(UTF_8), + etcdEvent.getKv().getValue().toString(UTF_8)); + listener.process(event); + } + } + + @Override + public void onError(Throwable throwable) { + // ignore + } + + @Override + public void onCompleted() { + // ignore + } + + public long getWatchId() { + return watchId; + } + + private void watch(String key) { + watchStub = WatchGrpc.newStub(channel); + observer = watchStub.watch(this); + WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder() + .setKey(ByteString.copyFromUtf8(key)) + .setProgressNotify(true); + WatchRequest req = WatchRequest.newBuilder().setCreateRequest(builder).build(); + observer.onNext(req); + } + + private void cancelWatch() { + WatchCancelRequest watchCancelRequest = + WatchCancelRequest.newBuilder().setWatchId(watchId).build(); + WatchRequest cancelRequest = WatchRequest.newBuilder() + .setCancelRequest(watchCancelRequest).build(); + observer.onNext(cancelRequest); + } + } } diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java index 75a5dfb0b483..286be9344693 100644 --- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java @@ -33,7 +33,6 @@ */ package org.apache.dubbo.remoting.etcd; -import io.etcd.jetcd.Watch; import org.apache.dubbo.common.URL; import java.util.List; @@ -173,16 +172,6 @@ public long createLease(long ttl, long timeout, TimeUnit unit) */ String getKVValue(String key); - - /** - * Watch the specified key with given listener. - * The listener will be notified with future update to the key - * @param key the key to watch - * @param listener the listener to notify - */ - void addWatchListener(String key, Watch.Listener listener); - - /** * Put the key value pair to etcd * @param key the specified key diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java index 0fbfa41e8f1b..ff4c118b2a33 100644 --- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java @@ -17,8 +17,7 @@ package org.apache.dubbo.remoting.etcd.jetcd; -import io.etcd.jetcd.Client; -import io.etcd.jetcd.Watch; +import io.grpc.ManagedChannel; import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.logger.Logger; @@ -192,20 +191,15 @@ public String getKVValue(String key) { return clientWrapper.getKVValue(key); } - @Override - public void addWatchListener(String key, Watch.Listener listener) { - Client client = clientWrapper.getClient(); - if (client == null) { - return; - } - client.getWatchClient().watch(ByteSequence.from(key, UTF_8), listener); - } - @Override public boolean put(String key, String value) { return clientWrapper.put(key, value); } + public ManagedChannel getChannel() { + return clientWrapper.getChannel(); + } + public class EtcdWatcher implements StreamObserver { protected WatchGrpc.WatchStub watchStub; diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java index 877c6cb425dd..c7f472d40110 100644 --- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java @@ -550,7 +550,9 @@ protected void doClose() { try { cancelKeepAlive = true; - revokeLease(this.globalLeaseId); + if (globalLeaseId > 0) { + revokeLease(this.globalLeaseId); + } } catch (Exception e) { logger.warn("revoke global lease '" + globalLeaseId + "' failed, registry: " + url, e); } diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java b/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java index dfa66ac411b8..9674feec35df 100644 --- a/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java @@ -33,12 +33,21 @@ */ package org.apache.dubbo.remoting.etcd.jetcd; +import com.google.protobuf.ByteString; import io.etcd.jetcd.ByteSequence; import io.etcd.jetcd.Client; import io.etcd.jetcd.Watch; +import io.etcd.jetcd.api.Event; +import io.etcd.jetcd.api.WatchCancelRequest; +import io.etcd.jetcd.api.WatchCreateRequest; +import io.etcd.jetcd.api.WatchGrpc; +import io.etcd.jetcd.api.WatchRequest; +import io.etcd.jetcd.api.WatchResponse; import io.etcd.jetcd.common.exception.ClosedClientException; import io.etcd.jetcd.watch.WatchEvent; +import io.grpc.ManagedChannel; import io.grpc.Status; +import io.grpc.stub.StreamObserver; import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.URL; import org.apache.dubbo.remoting.etcd.ChildListener; @@ -48,8 +57,11 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import static java.nio.charset.StandardCharsets.UTF_8; @@ -93,8 +105,9 @@ public void test_watch_when_modify() { Assertions.assertEquals("PUT", event.getEventType().toString()); Assertions.assertEquals(path, event.getKeyValue().getKey().toString(UTF_8)); Assertions.assertEquals("Hello", event.getKeyValue().getValue().toString(UTF_8)); + latch.countDown(); } - latch.countDown(); + }); try (Client client = Client.builder().endpoints(endpoint).build(); @@ -108,6 +121,111 @@ public void test_watch_when_modify() { } } + @Test + public void testWatchWithGrpc() { + String path = "/dubbo/config/test_watch_with_grpc/configurators"; + String endpoint = "http://127.0.0.1:2379"; + CountDownLatch latch = new CountDownLatch(1); + try (Client client = Client.builder().endpoints(endpoint).build()) { + ManagedChannel channel = getChannel(client); + StreamObserver observer = WatchGrpc.newStub(channel).watch(new StreamObserver() { + @Override + public void onNext(WatchResponse response) { + for (Event event : response.getEventsList()) { + Assertions.assertEquals("PUT", event.getType().toString()); + Assertions.assertEquals(path, event.getKv().getKey().toString(UTF_8)); + Assertions.assertEquals("Hello", event.getKv().getValue().toString(UTF_8)); + latch.countDown(); + } + } + + @Override + public void onError(Throwable throwable) { + + } + + @Override + public void onCompleted() { + + } + }); + WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder() + .setKey(ByteString.copyFrom(path, UTF_8)); + + observer.onNext(WatchRequest.newBuilder().setCreateRequest(builder).build()); + + // try to modify the key + client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello", UTF_8)); + latch.await(5, TimeUnit.SECONDS); + } catch (Exception e) { + Assertions.fail(e.getMessage()); + } + } + + @Test + public void testCancelWatchWithGrpc() { + String path = "/dubbo/config/testCancelWatchWithGrpc/configurators"; + String endpoint = "http://127.0.0.1:2379"; + CountDownLatch updateLatch = new CountDownLatch(1); + CountDownLatch cancelLatch = new CountDownLatch(1); + final AtomicLong watchID = new AtomicLong(-1L); + try (Client client = Client.builder().endpoints(endpoint).build()) { + ManagedChannel channel = getChannel(client); + StreamObserver observer = WatchGrpc.newStub(channel).watch(new StreamObserver() { + @Override + public void onNext(WatchResponse response) { + watchID.set(response.getWatchId()); + for (Event event : response.getEventsList()) { + Assertions.assertEquals("PUT", event.getType().toString()); + Assertions.assertEquals(path, event.getKv().getKey().toString(UTF_8)); + Assertions.assertEquals("Hello", event.getKv().getValue().toString(UTF_8)); + updateLatch.countDown(); + } + if (response.getCanceled()) { + // received the cancel response + cancelLatch.countDown(); + } + } + + @Override + public void onError(Throwable throwable) { + + } + + @Override + public void onCompleted() { + + } + }); + // create + WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder() + .setKey(ByteString.copyFrom(path, UTF_8)); + + // make the grpc call to watch the key + observer.onNext(WatchRequest.newBuilder().setCreateRequest(builder).build()); + + // try to put the value + client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello", UTF_8)); + + // response received, latch counts down to zero + updateLatch.await(); + + WatchCancelRequest watchCancelRequest = + WatchCancelRequest.newBuilder().setWatchId(watchID.get()).build(); + WatchRequest cancelRequest = WatchRequest.newBuilder() + .setCancelRequest(watchCancelRequest).build(); + observer.onNext(cancelRequest); + + // try to put the value + client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello world", UTF_8)); + + cancelLatch.await(); + } catch (Exception e) { + Assertions.fail(e.getMessage()); + } + + } + @Test public void test_watch_when_create_wrong_path() throws InterruptedException { @@ -290,4 +408,19 @@ synchronized int increaseAndGet() { return ++value; } } + + private ManagedChannel getChannel(Client client) { + try { + // hack, use reflection to get the shared channel. + Field connectionField = client.getClass().getDeclaredField("connectionManager"); + connectionField.setAccessible(true); + Object connection = connectionField.get(client); + Method channelMethod = connection.getClass().getDeclaredMethod("getChannel"); + channelMethod.setAccessible(true); + ManagedChannel channel = (ManagedChannel) channelMethod.invoke(connection); + return channel; + } catch (Exception e) { + return null; + } + } } From d1764c8c88956ca77ed7e69b413c645dce2db643 Mon Sep 17 00:00:00 2001 From: Huxing Zhang Date: Fri, 15 Mar 2019 00:42:12 +0800 Subject: [PATCH 10/13] Polish code --- .../dubbo/registry/etcd/EtcdRegistry.java | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java index 504d521aaedd..f0d94067d258 100644 --- a/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java +++ b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java @@ -71,7 +71,7 @@ public class EtcdRegistry extends FailbackRegistry { private final Set anyServices = new ConcurrentHashSet(); - private final ConcurrentMap> etcdListeners = new ConcurrentHashMap>(); + private final ConcurrentMap> etcdListeners = new ConcurrentHashMap<>(); private final EtcdClient etcdClient; private long expirePeriod; @@ -86,14 +86,12 @@ public EtcdRegistry(URL url, EtcdTransporter etcdTransporter) { } this.root = group; etcdClient = etcdTransporter.connect(url); - etcdClient.addStateListener(new StateListener() { - public void stateChanged(int state) { - if (state == CONNECTED) { - try { - recover(); - } catch (Exception e) { - logger.error(e.getMessage(), e); - } + etcdClient.addStateListener(state -> { + if (state == StateListener.CONNECTED) { + try { + recover(); + } catch (Exception e) { + logger.error(e.getMessage(), e); } } }); @@ -345,7 +343,7 @@ protected List toUnsubscribedPath(URL url) { } protected List toUrlsWithoutEmpty(URL consumer, List providers) { - List urls = new ArrayList(); + List urls = new ArrayList<>(); if (providers != null && providers.size() > 0) { for (String provider : providers) { provider = URL.decode(provider); From 8b2a26aaeb6b7585d99ea830c6e1429d313ec9e9 Mon Sep 17 00:00:00 2001 From: Huxing Zhang Date: Fri, 15 Mar 2019 10:53:41 +0800 Subject: [PATCH 11/13] Distinguish modification event and delete event --- .../support/etcd/EtcdDynamicConfiguration.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java index 67addb4dc616..be3235235551 100644 --- a/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java +++ b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java @@ -29,6 +29,7 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.configcenter.ConfigChangeEvent; +import org.apache.dubbo.configcenter.ConfigChangeType; import org.apache.dubbo.configcenter.ConfigurationListener; import org.apache.dubbo.configcenter.DynamicConfiguration; import org.apache.dubbo.remoting.etcd.jetcd.JEtcdClient; @@ -122,9 +123,13 @@ public EtcdConfigWatcher(ConfigurationListener listener) { public void onNext(WatchResponse watchResponse) { this.watchId = watchResponse.getWatchId(); for (Event etcdEvent : watchResponse.getEventsList()) { + ConfigChangeType type = ConfigChangeType.MODIFIED; + if (etcdEvent.getType() == Event.EventType.DELETE) { + type = ConfigChangeType.DELETED; + } ConfigChangeEvent event = new ConfigChangeEvent( etcdEvent.getKv().getKey().toString(UTF_8), - etcdEvent.getKv().getValue().toString(UTF_8)); + etcdEvent.getKv().getValue().toString(UTF_8), type); listener.process(event); } } From 934b835b16b8a10906a850a889c8349e66869764 Mon Sep 17 00:00:00 2001 From: Huxing Zhang Date: Fri, 15 Mar 2019 16:14:12 +0800 Subject: [PATCH 12/13] Add etcd registry and configcenter to dubbo-all --- dubbo-all/pom.xml | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/dubbo-all/pom.xml b/dubbo-all/pom.xml index f38201f8e309..f6ecc74a698c 100644 --- a/dubbo-all/pom.xml +++ b/dubbo-all/pom.xml @@ -241,6 +241,13 @@ compile true + + org.apache.dubbo + dubbo-registry-etcd3 + ${project.version} + compile + true + org.apache.dubbo dubbo-monitor-api @@ -360,6 +367,13 @@ compile true + + org.apache.dubbo + dubbo-configcenter-etcd + ${project.version} + compile + true + org.apache.dubbo dubbo-compatible @@ -490,6 +504,7 @@ org.apache.dubbo:dubbo-registry-zookeeper org.apache.dubbo:dubbo-registry-redis org.apache.dubbo:dubbo-registry-consul + org.apache.dubbo:dubbo-registry-etcd3 org.apache.dubbo:dubbo-monitor-api org.apache.dubbo:dubbo-monitor-default org.apache.dubbo:dubbo-config-api @@ -511,6 +526,7 @@ org.apache.dubbo:dubbo-configcenter-apollo org.apache.dubbo:dubbo-configcenter-zookeeper org.apache.dubbo:dubbo-configcenter-consul + org.apache.dubbo:dubbo-configcenter-etcd org.apache.dubbo:dubbo-metadata-report-api org.apache.dubbo:dubbo-metadata-definition org.apache.dubbo:dubbo-metadata-report-redis From 426cbaef9633073761b4556ba78b95eee682e6f8 Mon Sep 17 00:00:00 2001 From: Huxing Zhang Date: Fri, 15 Mar 2019 17:04:39 +0800 Subject: [PATCH 13/13] Watch again when connection is re-established --- .../etcd/EtcdDynamicConfiguration.java | 28 +++++++++++++++---- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java index be3235235551..18e90887592e 100644 --- a/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java +++ b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java @@ -32,6 +32,7 @@ import org.apache.dubbo.configcenter.ConfigChangeType; import org.apache.dubbo.configcenter.ConfigurationListener; import org.apache.dubbo.configcenter.DynamicConfiguration; +import org.apache.dubbo.remoting.etcd.StateListener; import org.apache.dubbo.remoting.etcd.jetcd.JEtcdClient; import java.util.concurrent.ConcurrentHashMap; @@ -64,16 +65,25 @@ public class EtcdDynamicConfiguration implements DynamicConfiguration { EtcdDynamicConfiguration(URL url) { rootPath = "/" + url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + "/config"; etcdClient = new JEtcdClient(url); + etcdClient.addStateListener(state -> { + if (state == StateListener.CONNECTED) { + try { + recover(); + } catch (Exception e) { + // ignore + } + } + }); watchListenerMap = new ConcurrentHashMap<>(); } @Override public void addListener(String key, String group, ConfigurationListener listener) { if (watchListenerMap.get(listener) == null) { - EtcdConfigWatcher watcher = new EtcdConfigWatcher(listener); - watchListenerMap.put(listener, watcher); String normalizedKey = convertKey(key); - watcher.watch(normalizedKey); + EtcdConfigWatcher watcher = new EtcdConfigWatcher(normalizedKey, listener); + watchListenerMap.put(listener, watcher); + watcher.watch(); } } @@ -106,6 +116,12 @@ private String convertKey(String key) { return rootPath + PATH_SEPARATOR + key.substring(0, index) + PATH_SEPARATOR + key.substring(index + 1); } + private void recover() { + for (EtcdConfigWatcher watcher: watchListenerMap.values()) { + watcher.watch(); + } + } + public class EtcdConfigWatcher implements StreamObserver { private ConfigurationListener listener; @@ -113,8 +129,10 @@ public class EtcdConfigWatcher implements StreamObserver { private StreamObserver observer; protected long watchId; private ManagedChannel channel; + private String key; - public EtcdConfigWatcher(ConfigurationListener listener) { + public EtcdConfigWatcher(String key, ConfigurationListener listener) { + this.key = key; this.listener = listener; this.channel = etcdClient.getChannel(); } @@ -148,7 +166,7 @@ public long getWatchId() { return watchId; } - private void watch(String key) { + private void watch() { watchStub = WatchGrpc.newStub(channel); observer = watchStub.watch(this); WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder()