diff --git a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosAggregateListener.java b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosAggregateListener.java new file mode 100644 index 00000000000..5593acd9152 --- /dev/null +++ b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosAggregateListener.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.registry.nacos; + +import org.apache.dubbo.common.utils.ConcurrentHashSet; +import org.apache.dubbo.registry.NotifyListener; + +import com.alibaba.nacos.api.naming.pojo.Instance; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +public class NacosAggregateListener { + private final NotifyListener notifyListener; + private final Set serviceNames = new ConcurrentHashSet<>(); + private final Map> serviceInstances = new ConcurrentHashMap<>(); + + public NacosAggregateListener(NotifyListener notifyListener) { + this.notifyListener = notifyListener; + } + + public List saveAndAggregateAllInstances(String serviceName, List instances) { + serviceNames.add(serviceName); + serviceInstances.put(serviceName, instances); + return serviceInstances.values().stream().flatMap(List::stream).collect(Collectors.toList()); + } + + public NotifyListener getNotifyListener() { + return notifyListener; + } + + public Set getServiceNames() { + return serviceNames; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + NacosAggregateListener that = (NacosAggregateListener) o; + return Objects.equals(notifyListener, that.notifyListener) && Objects.equals(serviceNames, that.serviceNames) && Objects.equals(serviceInstances, that.serviceInstances); + } + + @Override + public int hashCode() { + return Objects.hash(notifyListener, serviceNames, serviceInstances); + } +} diff --git a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosRegistry.java b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosRegistry.java index 9987986ca24..060bf2013a9 100644 --- a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosRegistry.java +++ b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosRegistry.java @@ -34,24 +34,23 @@ import com.alibaba.nacos.api.naming.listener.NamingEvent; import com.alibaba.nacos.api.naming.pojo.Instance; import com.alibaba.nacos.api.naming.pojo.ListView; -import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; -import java.util.Collections; import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE; import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY; @@ -124,7 +123,9 @@ public class NacosRegistry extends FailbackRegistry { private final NacosNamingServiceWrapper namingService; - private final ConcurrentMap> nacosListeners = new ConcurrentHashMap<>(); + private final ConcurrentMap> originToAggregateListener = new ConcurrentHashMap<>(); + + private final ConcurrentMap> nacosListeners = new ConcurrentHashMap<>(); public NacosRegistry(URL url, NacosNamingServiceWrapper namingService) { super(url); @@ -179,7 +180,10 @@ public void doUnregister(final URL url) { @Override public void doSubscribe(final URL url, final NotifyListener listener) { - Set serviceNames = getServiceNames(url, listener); + NacosAggregateListener nacosAggregateListener = new NacosAggregateListener(listener); + originToAggregateListener.computeIfAbsent(url, k -> new ConcurrentHashMap<>()).put(listener, nacosAggregateListener); + + Set serviceNames = getServiceNames(url, nacosAggregateListener); //Set corresponding serviceNames for easy search later if (isServiceNamesWithCompatibleMode(url)) { @@ -187,14 +191,12 @@ public void doSubscribe(final URL url, final NotifyListener listener) { NacosInstanceManageUtil.setCorrespondingServiceNames(serviceName, serviceNames); } } - - doSubscribe(url, listener, serviceNames); + doSubscribe(url, nacosAggregateListener, serviceNames); } - private void doSubscribe(final URL url, final NotifyListener listener, final Set serviceNames) { + private void doSubscribe(final URL url, final NacosAggregateListener listener, final Set serviceNames) { execute(namingService -> { if (isServiceNamesWithCompatibleMode(url)) { - List allCorrespondingInstanceList = Lists.newArrayList(); /** * Get all instances with serviceNames to avoid instance overwrite and but with empty instance mentioned @@ -209,9 +211,8 @@ private void doSubscribe(final URL url, final NotifyListener listener, final Set List instances = namingService.getAllInstances(serviceName, getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP)); NacosInstanceManageUtil.initOrRefreshServiceInstanceList(serviceName, instances); - allCorrespondingInstanceList.addAll(instances); + notifySubscriber(url, serviceName, listener, instances); } - notifySubscriber(url, listener, allCorrespondingInstanceList); for (String serviceName : serviceNames) { subscribeEventListener(serviceName, url, listener); } @@ -220,7 +221,7 @@ private void doSubscribe(final URL url, final NotifyListener listener, final Set for (String serviceName : serviceNames) { instances.addAll(namingService.getAllInstances(serviceName , getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP))); - notifySubscriber(url, listener, instances); + notifySubscriber(url, serviceName, listener, instances); subscribeEventListener(serviceName, url, listener); } } @@ -243,18 +244,24 @@ private boolean isServiceNamesWithCompatibleMode(final URL url) { public void doUnsubscribe(URL url, NotifyListener listener) { if (isAdminProtocol(url)) { shutdownServiceNamesLookup(); - } - else { - Set serviceNames = getServiceNames(url, listener); + } else { + Map listenerMap = originToAggregateListener.get(url); + NacosAggregateListener nacosAggregateListener = listenerMap.remove(listener); + if (nacosAggregateListener != null) { + Set serviceNames = getServiceNames(url, nacosAggregateListener); - doUnsubscribe(url, listener, serviceNames); + doUnsubscribe(url, nacosAggregateListener, serviceNames); + } + if (listenerMap.isEmpty()) { + originToAggregateListener.remove(url); + } } } - private void doUnsubscribe(final URL url, final NotifyListener listener, final Set serviceNames) { + private void doUnsubscribe(final URL url, final NacosAggregateListener nacosAggregateListener, final Set serviceNames) { execute(namingService -> { for (String serviceName : serviceNames) { - unsubscribeEventListener(serviceName, url, listener); + unsubscribeEventListener(serviceName, url, nacosAggregateListener); } }); } @@ -272,7 +279,7 @@ private void shutdownServiceNamesLookup() { * @param listener {@link NotifyListener} * @return non-null */ - private Set getServiceNames(URL url, NotifyListener listener) { + private Set getServiceNames(URL url, NacosAggregateListener listener) { if (isAdminProtocol(url)) { scheduleServiceNamesLookup(url, listener); return getServiceNamesForOps(url); @@ -306,7 +313,7 @@ private Set filterServiceNames(NacosServiceName serviceName) { Set serviceNames = new LinkedHashSet<>(); execute(namingService -> serviceNames.addAll(namingService.getServicesOfServer(1, Integer.MAX_VALUE, - getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP)).getData() + getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP)).getData() .stream() .filter(this::isConformRules) .map(NacosServiceName::new) @@ -357,7 +364,7 @@ private boolean isAdminProtocol(URL url) { return ADMIN_PROTOCOL.equals(url.getProtocol()); } - private void scheduleServiceNamesLookup(final URL url, final NotifyListener listener) { + private void scheduleServiceNamesLookup(final URL url, final NacosAggregateListener listener) { if (scheduledExecutorService == null) { scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); scheduledExecutorService.scheduleAtFixedRate(() -> { @@ -510,9 +517,9 @@ private List buildURLs(URL consumerURL, Collection instances) { return urls; } - private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener) + private void subscribeEventListener(String serviceName, final URL url, final NacosAggregateListener listener) throws NacosException { - ConcurrentMap listeners = nacosListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>()); + ConcurrentMap listeners = nacosListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>()); EventListener nacosListener = listeners.computeIfAbsent(listener, k -> { EventListener eventListener = event -> { if (event instanceof NamingEvent) { @@ -528,7 +535,7 @@ private void subscribeEventListener(String serviceName, final URL url, final Not instances = NacosInstanceManageUtil.getAllCorrespondingServiceInstanceList(serviceName); } - notifySubscriber(url, listener, instances); + notifySubscriber(url, serviceName, listener, instances); } }; return eventListener; @@ -538,14 +545,14 @@ private void subscribeEventListener(String serviceName, final URL url, final Not nacosListener); } - private void unsubscribeEventListener(String serviceName, final URL url, final NotifyListener listener) + private void unsubscribeEventListener(String serviceName, final URL url, final NacosAggregateListener listener) throws NacosException { - ConcurrentMap notifyListenerEventListenerConcurrentMap = nacosListeners.get(url); - if(notifyListenerEventListenerConcurrentMap == null){ + ConcurrentMap notifyListenerEventListenerConcurrentMap = nacosListeners.get(url); + if (notifyListenerEventListenerConcurrentMap == null) { return; } EventListener nacosListener = notifyListenerEventListenerConcurrentMap.get(listener); - if(nacosListener == null){ + if (nacosListener == null) { return; } namingService.unsubscribe(serviceName, @@ -560,14 +567,14 @@ private void unsubscribeEventListener(String serviceName, final URL url, final N * @param listener {@link NotifyListener} * @param instances all {@link Instance instances} */ - private void notifySubscriber(URL url, NotifyListener listener, Collection instances) { + private void notifySubscriber(URL url, String serviceName, NacosAggregateListener listener, Collection instances) { List enabledInstances = new LinkedList<>(instances); if (enabledInstances.size() > 0) { // Instances filterEnabledInstances(enabledInstances); } - List urls = toUrlWithEmpty(url, enabledInstances); - NacosRegistry.this.notify(url, listener, urls); + List aggregatedUrls = toUrlWithEmpty(url, listener.saveAndAggregateAllInstances(serviceName, enabledInstances)); + NacosRegistry.this.notify(url, listener.getNotifyListener(), aggregatedUrls); } /**