diff --git a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosAggregateListener.java b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosAggregateListener.java deleted file mode 100644 index 5593acd9152..00000000000 --- a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosAggregateListener.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.registry.nacos; - -import org.apache.dubbo.common.utils.ConcurrentHashSet; -import org.apache.dubbo.registry.NotifyListener; - -import com.alibaba.nacos.api.naming.pojo.Instance; - -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; - -public class NacosAggregateListener { - private final NotifyListener notifyListener; - private final Set serviceNames = new ConcurrentHashSet<>(); - private final Map> serviceInstances = new ConcurrentHashMap<>(); - - public NacosAggregateListener(NotifyListener notifyListener) { - this.notifyListener = notifyListener; - } - - public List saveAndAggregateAllInstances(String serviceName, List instances) { - serviceNames.add(serviceName); - serviceInstances.put(serviceName, instances); - return serviceInstances.values().stream().flatMap(List::stream).collect(Collectors.toList()); - } - - public NotifyListener getNotifyListener() { - return notifyListener; - } - - public Set getServiceNames() { - return serviceNames; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - NacosAggregateListener that = (NacosAggregateListener) o; - return Objects.equals(notifyListener, that.notifyListener) && Objects.equals(serviceNames, that.serviceNames) && Objects.equals(serviceInstances, that.serviceInstances); - } - - @Override - public int hashCode() { - return Objects.hash(notifyListener, serviceNames, serviceInstances); - } -} diff --git a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosRegistry.java b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosRegistry.java index 060bf2013a9..9987986ca24 100644 --- a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosRegistry.java +++ b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosRegistry.java @@ -34,23 +34,24 @@ import com.alibaba.nacos.api.naming.listener.NamingEvent; import com.alibaba.nacos.api.naming.pojo.Instance; import com.alibaba.nacos.api.naming.pojo.ListView; +import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import java.util.Collections; import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE; import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY; @@ -123,9 +124,7 @@ public class NacosRegistry extends FailbackRegistry { private final NacosNamingServiceWrapper namingService; - private final ConcurrentMap> originToAggregateListener = new ConcurrentHashMap<>(); - - private final ConcurrentMap> nacosListeners = new ConcurrentHashMap<>(); + private final ConcurrentMap> nacosListeners = new ConcurrentHashMap<>(); public NacosRegistry(URL url, NacosNamingServiceWrapper namingService) { super(url); @@ -180,10 +179,7 @@ public void doUnregister(final URL url) { @Override public void doSubscribe(final URL url, final NotifyListener listener) { - NacosAggregateListener nacosAggregateListener = new NacosAggregateListener(listener); - originToAggregateListener.computeIfAbsent(url, k -> new ConcurrentHashMap<>()).put(listener, nacosAggregateListener); - - Set serviceNames = getServiceNames(url, nacosAggregateListener); + Set serviceNames = getServiceNames(url, listener); //Set corresponding serviceNames for easy search later if (isServiceNamesWithCompatibleMode(url)) { @@ -191,12 +187,14 @@ public void doSubscribe(final URL url, final NotifyListener listener) { NacosInstanceManageUtil.setCorrespondingServiceNames(serviceName, serviceNames); } } - doSubscribe(url, nacosAggregateListener, serviceNames); + + doSubscribe(url, listener, serviceNames); } - private void doSubscribe(final URL url, final NacosAggregateListener listener, final Set serviceNames) { + private void doSubscribe(final URL url, final NotifyListener listener, final Set serviceNames) { execute(namingService -> { if (isServiceNamesWithCompatibleMode(url)) { + List allCorrespondingInstanceList = Lists.newArrayList(); /** * Get all instances with serviceNames to avoid instance overwrite and but with empty instance mentioned @@ -211,8 +209,9 @@ private void doSubscribe(final URL url, final NacosAggregateListener listener, f List instances = namingService.getAllInstances(serviceName, getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP)); NacosInstanceManageUtil.initOrRefreshServiceInstanceList(serviceName, instances); - notifySubscriber(url, serviceName, listener, instances); + allCorrespondingInstanceList.addAll(instances); } + notifySubscriber(url, listener, allCorrespondingInstanceList); for (String serviceName : serviceNames) { subscribeEventListener(serviceName, url, listener); } @@ -221,7 +220,7 @@ private void doSubscribe(final URL url, final NacosAggregateListener listener, f for (String serviceName : serviceNames) { instances.addAll(namingService.getAllInstances(serviceName , getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP))); - notifySubscriber(url, serviceName, listener, instances); + notifySubscriber(url, listener, instances); subscribeEventListener(serviceName, url, listener); } } @@ -244,24 +243,18 @@ private boolean isServiceNamesWithCompatibleMode(final URL url) { public void doUnsubscribe(URL url, NotifyListener listener) { if (isAdminProtocol(url)) { shutdownServiceNamesLookup(); - } else { - Map listenerMap = originToAggregateListener.get(url); - NacosAggregateListener nacosAggregateListener = listenerMap.remove(listener); - if (nacosAggregateListener != null) { - Set serviceNames = getServiceNames(url, nacosAggregateListener); + } + else { + Set serviceNames = getServiceNames(url, listener); - doUnsubscribe(url, nacosAggregateListener, serviceNames); - } - if (listenerMap.isEmpty()) { - originToAggregateListener.remove(url); - } + doUnsubscribe(url, listener, serviceNames); } } - private void doUnsubscribe(final URL url, final NacosAggregateListener nacosAggregateListener, final Set serviceNames) { + private void doUnsubscribe(final URL url, final NotifyListener listener, final Set serviceNames) { execute(namingService -> { for (String serviceName : serviceNames) { - unsubscribeEventListener(serviceName, url, nacosAggregateListener); + unsubscribeEventListener(serviceName, url, listener); } }); } @@ -279,7 +272,7 @@ private void shutdownServiceNamesLookup() { * @param listener {@link NotifyListener} * @return non-null */ - private Set getServiceNames(URL url, NacosAggregateListener listener) { + private Set getServiceNames(URL url, NotifyListener listener) { if (isAdminProtocol(url)) { scheduleServiceNamesLookup(url, listener); return getServiceNamesForOps(url); @@ -313,7 +306,7 @@ private Set filterServiceNames(NacosServiceName serviceName) { Set serviceNames = new LinkedHashSet<>(); execute(namingService -> serviceNames.addAll(namingService.getServicesOfServer(1, Integer.MAX_VALUE, - getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP)).getData() + getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP)).getData() .stream() .filter(this::isConformRules) .map(NacosServiceName::new) @@ -364,7 +357,7 @@ private boolean isAdminProtocol(URL url) { return ADMIN_PROTOCOL.equals(url.getProtocol()); } - private void scheduleServiceNamesLookup(final URL url, final NacosAggregateListener listener) { + private void scheduleServiceNamesLookup(final URL url, final NotifyListener listener) { if (scheduledExecutorService == null) { scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); scheduledExecutorService.scheduleAtFixedRate(() -> { @@ -517,9 +510,9 @@ private List buildURLs(URL consumerURL, Collection instances) { return urls; } - private void subscribeEventListener(String serviceName, final URL url, final NacosAggregateListener listener) + private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener) throws NacosException { - ConcurrentMap listeners = nacosListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>()); + ConcurrentMap listeners = nacosListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>()); EventListener nacosListener = listeners.computeIfAbsent(listener, k -> { EventListener eventListener = event -> { if (event instanceof NamingEvent) { @@ -535,7 +528,7 @@ private void subscribeEventListener(String serviceName, final URL url, final Nac instances = NacosInstanceManageUtil.getAllCorrespondingServiceInstanceList(serviceName); } - notifySubscriber(url, serviceName, listener, instances); + notifySubscriber(url, listener, instances); } }; return eventListener; @@ -545,14 +538,14 @@ private void subscribeEventListener(String serviceName, final URL url, final Nac nacosListener); } - private void unsubscribeEventListener(String serviceName, final URL url, final NacosAggregateListener listener) + private void unsubscribeEventListener(String serviceName, final URL url, final NotifyListener listener) throws NacosException { - ConcurrentMap notifyListenerEventListenerConcurrentMap = nacosListeners.get(url); - if (notifyListenerEventListenerConcurrentMap == null) { + ConcurrentMap notifyListenerEventListenerConcurrentMap = nacosListeners.get(url); + if(notifyListenerEventListenerConcurrentMap == null){ return; } EventListener nacosListener = notifyListenerEventListenerConcurrentMap.get(listener); - if (nacosListener == null) { + if(nacosListener == null){ return; } namingService.unsubscribe(serviceName, @@ -567,14 +560,14 @@ private void unsubscribeEventListener(String serviceName, final URL url, final N * @param listener {@link NotifyListener} * @param instances all {@link Instance instances} */ - private void notifySubscriber(URL url, String serviceName, NacosAggregateListener listener, Collection instances) { + private void notifySubscriber(URL url, NotifyListener listener, Collection instances) { List enabledInstances = new LinkedList<>(instances); if (enabledInstances.size() > 0) { // Instances filterEnabledInstances(enabledInstances); } - List aggregatedUrls = toUrlWithEmpty(url, listener.saveAndAggregateAllInstances(serviceName, enabledInstances)); - NacosRegistry.this.notify(url, listener.getNotifyListener(), aggregatedUrls); + List urls = toUrlWithEmpty(url, enabledInstances); + NacosRegistry.this.notify(url, listener, urls); } /**