diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java index aa6ae056f4f..e54fcf86a35 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java @@ -831,8 +831,13 @@ public class Constants { * Production environment key. */ public static final String PRODUCTION_ENVIRONMENT = "product"; - /* - * private Constants(){ } - */ + + public static final String ETCD3_NOTIFY_MAXTHREADS_KEYS = "etcd3.notify.maxthreads"; + + public static final int DEFAULT_ETCD3_NOTIFY_THREADS = DEFAULT_IO_THREADS; + + public static final String DEFAULT_ETCD3_NOTIFY_QUEUES_KEY = "etcd3.notify.queues"; + + public static final int DEFAULT_GRPC_QUEUES = 300_0000; } diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java index 979caeef82e..d07cad06405 100644 --- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java @@ -15,24 +15,19 @@ * limitations under the License. */ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package org.apache.dubbo.remoting.etcd.jetcd; +import org.apache.dubbo.common.Constants; +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.common.utils.ExecutorUtil; +import org.apache.dubbo.common.utils.NamedThreadFactory; +import org.apache.dubbo.remoting.etcd.ChildListener; +import org.apache.dubbo.remoting.etcd.StateListener; +import org.apache.dubbo.remoting.etcd.option.OptionUtil; +import org.apache.dubbo.remoting.etcd.support.AbstractEtcdClient; + import com.google.protobuf.ByteString; import io.etcd.jetcd.ByteSequence; import io.etcd.jetcd.api.Event; @@ -46,15 +41,6 @@ import io.grpc.Status; import io.grpc.stub.StreamObserver; import io.netty.util.internal.ConcurrentSet; -import org.apache.dubbo.common.Constants; -import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.logger.Logger; -import org.apache.dubbo.common.logger.LoggerFactory; -import org.apache.dubbo.common.utils.NamedThreadFactory; -import org.apache.dubbo.remoting.etcd.ChildListener; -import org.apache.dubbo.remoting.etcd.StateListener; -import org.apache.dubbo.remoting.etcd.option.OptionUtil; -import org.apache.dubbo.remoting.etcd.support.AbstractEtcdClient; import java.util.ArrayList; import java.util.Collections; @@ -63,10 +49,14 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.ReentrantLock; import static java.util.stream.Collectors.toList; import static org.apache.dubbo.remoting.etcd.jetcd.JEtcdClientWrapper.UTF_8; @@ -79,6 +69,8 @@ public class JEtcdClient extends AbstractEtcdClient { private JEtcdClientWrapper clientWrapper; private ScheduledExecutorService reconnectSchedule; + private ExecutorService notifyExecutor; + private int delayPeriod; private Logger logger = LoggerFactory.getLogger(JEtcdClient.class); @@ -95,7 +87,16 @@ public JEtcdClient(URL url) { }); delayPeriod = getUrl().getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); reconnectSchedule = Executors.newScheduledThreadPool(1, - new NamedThreadFactory("auto-reconnect")); + new NamedThreadFactory("etcd3-watch-auto-reconnect")); + + notifyExecutor = new ThreadPoolExecutor( + 1 + , url.getParameter(Constants.ETCD3_NOTIFY_MAXTHREADS_KEYS, Constants.DEFAULT_ETCD3_NOTIFY_THREADS) + , Constants.DEFAULT_SESSION_TIMEOUT + , TimeUnit.MILLISECONDS + , new LinkedBlockingQueue(url.getParameter(Constants.DEFAULT_ETCD3_NOTIFY_QUEUES_KEY, Constants.DEFAULT_GRPC_QUEUES * 3)) + , new NamedThreadFactory("etcd3-notify", true)); + clientWrapper.start(); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); @@ -166,9 +167,19 @@ public void revokeLease(long lease) { @Override public void doClose() { try { - reconnectSchedule.shutdownNow(); + if (notifyExecutor != null) { + ExecutorUtil.shutdownNow(notifyExecutor, 100); + } } catch (Exception e) { + logger.warn(e.getMessage(), e); + } + try { + if (reconnectSchedule != null) { + ExecutorUtil.shutdownNow(reconnectSchedule, 100); + } + } catch (Exception e) { + logger.warn(e.getMessage(), e); } finally { clientWrapper.doClose(); } @@ -181,9 +192,11 @@ public class EtcdWatcher implements StreamObserver { protected long watchId; protected String path; protected Throwable throwable; - protected Set urls = new ConcurrentSet<>(); + protected volatile Set urls = new ConcurrentSet<>(); private ChildListener listener; + protected ReentrantLock lock = new ReentrantLock(true); + public EtcdWatcher(ChildListener listener) { this.listener = listener; } @@ -220,7 +233,12 @@ public void onNext(WatchResponse response) { } } if (modified > 0) { - listener.childChanged(path, new ArrayList<>(urls)); + notifyExecutor.execute(new Runnable() { + @Override + public void run() { + listener.childChanged(path, new ArrayList<>(urls)); + } + }); } } @@ -257,37 +275,42 @@ public List forPath(String path) { if (!isConnected()) { throw new ClosedClientException("watch client has been closed, path '" + path + "'"); } - if (this.path != null) { - if (this.path.equals(path)) { - return clientWrapper.getChildren(path); - } unwatch(); } - this.watchStub = WatchGrpc.newStub(clientWrapper.getChannel()); - this.watchRequest = watchStub.watch(this); this.path = path; - this.watchRequest.onNext(nextRequest()); - List children = clientWrapper.getChildren(path); + lock.lock(); + try { - /** - * caching the current service - */ - if (!children.isEmpty()) { - this.urls.addAll(filterChildren(children)); - } + this.watchStub = WatchGrpc.newStub(clientWrapper.getChannel()); + this.watchRequest = watchStub.watch(this); + this.watchRequest.onNext(nextRequest()); - return new ArrayList<>(urls); + List children = clientWrapper.getChildren(path); + /** + * caching the current service + */ + if (!children.isEmpty()) { + this.urls.addAll(filterChildren(children)); + } + + return new ArrayList<>(urls); + } finally { + lock.unlock(); + } } private boolean safeUpdate(String service, boolean add) { - synchronized (this) { + lock.lock(); + try { /** * If the collection already contains the specified service, do nothing */ return add ? this.urls.add(service) : this.urls.remove(service); + } finally { + lock.unlock(); } } diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java index e563cc2e822..8515b617ae9 100644 --- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java @@ -15,24 +15,18 @@ * limitations under the License. */ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package org.apache.dubbo.remoting.etcd.jetcd; +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.common.utils.ConcurrentHashSet; +import org.apache.dubbo.common.utils.NamedThreadFactory; +import org.apache.dubbo.common.utils.StringUtils; +import org.apache.dubbo.remoting.etcd.RetryPolicy; +import org.apache.dubbo.remoting.etcd.StateListener; +import org.apache.dubbo.remoting.etcd.option.Constants; + import io.etcd.jetcd.ByteSequence; import io.etcd.jetcd.Client; import io.etcd.jetcd.ClientBuilder; @@ -45,22 +39,15 @@ import io.etcd.jetcd.options.PutOption; import io.grpc.ConnectivityState; import io.grpc.ManagedChannel; +import io.grpc.Status; import io.grpc.stub.StreamObserver; import io.grpc.util.RoundRobinLoadBalancerFactory; -import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.logger.Logger; -import org.apache.dubbo.common.logger.LoggerFactory; -import org.apache.dubbo.common.utils.ConcurrentHashSet; -import org.apache.dubbo.common.utils.NamedThreadFactory; -import org.apache.dubbo.common.utils.StringUtils; -import org.apache.dubbo.remoting.etcd.RetryPolicy; -import org.apache.dubbo.remoting.etcd.StateListener; -import org.apache.dubbo.remoting.etcd.option.Constants; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.nio.charset.Charset; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -346,14 +333,15 @@ public long createEphemeral(String path) { public Long call() throws Exception { requiredNotNull(client, failed); - keepAlive(); registeredPaths.add(path); + keepAlive(); + final long leaseId = globalLeaseId; client.getKVClient() .put(ByteSequence.from(path, UTF_8) - , ByteSequence.from(String.valueOf(globalLeaseId), UTF_8) - , PutOption.newBuilder().withLeaseId(globalLeaseId).build()) + , ByteSequence.from(String.valueOf(leaseId), UTF_8) + , PutOption.newBuilder().withLeaseId(leaseId).build()) .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS); - return globalLeaseId; + return leaseId; } }, retryPolicy); } catch (Exception e) { @@ -426,27 +414,28 @@ private void keepAlive0(Consumer onFailed) { * causing the extreme scene service to be dropped. * */ + long leaseId = globalLeaseId; try { if (logger.isWarnEnabled()) { - logger.warn("Failed to keep alive for global lease, waiting for retry again."); + logger.warn("Failed to keep alive for global lease '" + leaseId + "', waiting for retry again."); } onFailed.accept(null); } catch (Exception ignored) { - logger.warn("Failed to recover from global lease expired or lease deadline exceeded.", ignored); + logger.warn("Failed to recover from global lease expired or lease deadline exceeded. lease '" + leaseId + "'", ignored); } } } private void recovery() { - /** - * The client is processing reconnection - */ - if (cancelKeepAlive) return; + try { + /** + * The client is processing reconnection + */ + if (cancelKeepAlive) return; - cancelKeepAlive(); + cancelKeepAlive(); - try { Set ephemeralPaths = new HashSet(registeredPaths); if (!ephemeralPaths.isEmpty()) { for (String path : ephemeralPaths) { @@ -460,11 +449,17 @@ private void recovery() { createEphemeral(path); failedRegistered.remove(path); - } catch (Exception ignored) { + } catch (Exception e) { + /** * waiting for retry again */ failedRegistered.add(path); + + Status status = Status.fromThrowable(e); + if (status.getCode() == Status.Code.NOT_FOUND) { + cancelKeepAlive(); + } } } } @@ -499,12 +494,13 @@ public Void call() throws Exception { public String[] endPoints(String backupAddress) { String[] endpoints = backupAddress.split(Constants.COMMA_SEPARATOR); - return Arrays.stream(endpoints) + List addressess = Arrays.stream(endpoints) .map(address -> address.indexOf(Constants.HTTP_SUBFIX_KEY) > -1 ? address : Constants.HTTP_KEY + address) - .collect(toList()) - .toArray(new String[0]); + .collect(toList()); + Collections.shuffle(addressess); + return addressess.toArray(new String[0]); } /** @@ -538,11 +534,14 @@ public void run() { if (connectState != connected) { int notifyState = connected ? StateListener.CONNECTED : StateListener.DISCONNECTED; if (connectionStateListener != null) { - if (connected) { - clearKeepAlive(); + try { + if (connected) { + clearKeepAlive(); + } + connectionStateListener.stateChanged(getClient(), notifyState); + } finally { + cancelKeepAlive = false; } - connectionStateListener.stateChanged(getClient(), notifyState); - cancelKeepAlive = false; } connectState = connected; } @@ -566,9 +565,8 @@ private void cancelKeepAlive() { } } - private synchronized void clearKeepAlive() { + private void clearKeepAlive() { cancelKeepAlive = true; - registeredPaths.clear(); failedRegistered.clear(); cancelKeepAlive(); } @@ -662,8 +660,16 @@ private void retry() { createEphemeral(path); failedRegistered.remove(path); - } catch (Throwable t) { - logger.warn("Failed to retry register(keep alive) for path '" + path + "', waiting for again, cause: " + t.getMessage(), t); + } catch (Throwable e) { + + failedRegistered.add(path); + + Status status = Status.fromThrowable(e); + if (status.getCode() == Status.Code.NOT_FOUND) { + cancelKeepAlive(); + } + + logger.warn("Failed to retry register(keep alive) for path '" + path + "', waiting for again, cause: " + e.getMessage(), e); } } } catch (Throwable t) {