Skip to content

Commit

Permalink
[2.7] Fix Nacos aggregate listen (apache#10467)
Browse files Browse the repository at this point in the history
* [2.7] Fix Nacos aggregate listen

* fix empty

* rename
  • Loading branch information
AlbumenJ committed Aug 16, 2022
1 parent cac4750 commit 8b580d0
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 32 deletions.
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.registry.nacos;

import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.registry.NotifyListener;

import com.alibaba.nacos.api.naming.pojo.Instance;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

public class NacosAggregateListener {
private final NotifyListener notifyListener;
private final Set<String> serviceNames = new ConcurrentHashSet<>();
private final Map<String, List<Instance>> serviceInstances = new ConcurrentHashMap<>();

public NacosAggregateListener(NotifyListener notifyListener) {
this.notifyListener = notifyListener;
}

public List<Instance> saveAndAggregateAllInstances(String serviceName, List<Instance> instances) {
serviceNames.add(serviceName);
serviceInstances.put(serviceName, instances);
return serviceInstances.values().stream().flatMap(List::stream).collect(Collectors.toList());
}

public NotifyListener getNotifyListener() {
return notifyListener;
}

public Set<String> getServiceNames() {
return serviceNames;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
NacosAggregateListener that = (NacosAggregateListener) o;
return Objects.equals(notifyListener, that.notifyListener) && Objects.equals(serviceNames, that.serviceNames) && Objects.equals(serviceInstances, that.serviceInstances);
}

@Override
public int hashCode() {
return Objects.hash(notifyListener, serviceNames, serviceInstances);
}
}
Expand Up @@ -34,24 +34,23 @@
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ListView;
import com.google.common.collect.Lists;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.Collections;

import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
Expand Down Expand Up @@ -124,7 +123,9 @@ public class NacosRegistry extends FailbackRegistry {

private final NacosNamingServiceWrapper namingService;

private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, EventListener>> nacosListeners = new ConcurrentHashMap<>();
private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, NacosAggregateListener>> originToAggregateListener = new ConcurrentHashMap<>();

private final ConcurrentMap<URL, ConcurrentMap<NacosAggregateListener, EventListener>> nacosListeners = new ConcurrentHashMap<>();

public NacosRegistry(URL url, NacosNamingServiceWrapper namingService) {
super(url);
Expand Down Expand Up @@ -179,22 +180,23 @@ public void doUnregister(final URL url) {

@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
Set<String> serviceNames = getServiceNames(url, listener);
NacosAggregateListener nacosAggregateListener = new NacosAggregateListener(listener);
originToAggregateListener.computeIfAbsent(url, k -> new ConcurrentHashMap<>()).put(listener, nacosAggregateListener);

Set<String> serviceNames = getServiceNames(url, nacosAggregateListener);

//Set corresponding serviceNames for easy search later
if (isServiceNamesWithCompatibleMode(url)) {
for (String serviceName : serviceNames) {
NacosInstanceManageUtil.setCorrespondingServiceNames(serviceName, serviceNames);
}
}

doSubscribe(url, listener, serviceNames);
doSubscribe(url, nacosAggregateListener, serviceNames);
}

private void doSubscribe(final URL url, final NotifyListener listener, final Set<String> serviceNames) {
private void doSubscribe(final URL url, final NacosAggregateListener listener, final Set<String> serviceNames) {
execute(namingService -> {
if (isServiceNamesWithCompatibleMode(url)) {
List<Instance> allCorrespondingInstanceList = Lists.newArrayList();

/**
* Get all instances with serviceNames to avoid instance overwrite and but with empty instance mentioned
Expand All @@ -209,9 +211,8 @@ private void doSubscribe(final URL url, final NotifyListener listener, final Set
List<Instance> instances = namingService.getAllInstances(serviceName,
getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP));
NacosInstanceManageUtil.initOrRefreshServiceInstanceList(serviceName, instances);
allCorrespondingInstanceList.addAll(instances);
notifySubscriber(url, serviceName, listener, instances);
}
notifySubscriber(url, listener, allCorrespondingInstanceList);
for (String serviceName : serviceNames) {
subscribeEventListener(serviceName, url, listener);
}
Expand All @@ -220,7 +221,7 @@ private void doSubscribe(final URL url, final NotifyListener listener, final Set
for (String serviceName : serviceNames) {
instances.addAll(namingService.getAllInstances(serviceName
, getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP)));
notifySubscriber(url, listener, instances);
notifySubscriber(url, serviceName, listener, instances);
subscribeEventListener(serviceName, url, listener);
}
}
Expand All @@ -243,18 +244,24 @@ private boolean isServiceNamesWithCompatibleMode(final URL url) {
public void doUnsubscribe(URL url, NotifyListener listener) {
if (isAdminProtocol(url)) {
shutdownServiceNamesLookup();
}
else {
Set<String> serviceNames = getServiceNames(url, listener);
} else {
Map<NotifyListener, NacosAggregateListener> listenerMap = originToAggregateListener.get(url);
NacosAggregateListener nacosAggregateListener = listenerMap.remove(listener);
if (nacosAggregateListener != null) {
Set<String> serviceNames = getServiceNames(url, nacosAggregateListener);

doUnsubscribe(url, listener, serviceNames);
doUnsubscribe(url, nacosAggregateListener, serviceNames);
}
if (listenerMap.isEmpty()) {
originToAggregateListener.remove(url);
}
}
}

private void doUnsubscribe(final URL url, final NotifyListener listener, final Set<String> serviceNames) {
private void doUnsubscribe(final URL url, final NacosAggregateListener nacosAggregateListener, final Set<String> serviceNames) {
execute(namingService -> {
for (String serviceName : serviceNames) {
unsubscribeEventListener(serviceName, url, listener);
unsubscribeEventListener(serviceName, url, nacosAggregateListener);
}
});
}
Expand All @@ -272,7 +279,7 @@ private void shutdownServiceNamesLookup() {
* @param listener {@link NotifyListener}
* @return non-null
*/
private Set<String> getServiceNames(URL url, NotifyListener listener) {
private Set<String> getServiceNames(URL url, NacosAggregateListener listener) {
if (isAdminProtocol(url)) {
scheduleServiceNamesLookup(url, listener);
return getServiceNamesForOps(url);
Expand Down Expand Up @@ -306,7 +313,7 @@ private Set<String> filterServiceNames(NacosServiceName serviceName) {
Set<String> serviceNames = new LinkedHashSet<>();

execute(namingService -> serviceNames.addAll(namingService.getServicesOfServer(1, Integer.MAX_VALUE,
getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP)).getData()
getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP)).getData()
.stream()
.filter(this::isConformRules)
.map(NacosServiceName::new)
Expand Down Expand Up @@ -357,7 +364,7 @@ private boolean isAdminProtocol(URL url) {
return ADMIN_PROTOCOL.equals(url.getProtocol());
}

private void scheduleServiceNamesLookup(final URL url, final NotifyListener listener) {
private void scheduleServiceNamesLookup(final URL url, final NacosAggregateListener listener) {
if (scheduledExecutorService == null) {
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService.scheduleAtFixedRate(() -> {
Expand Down Expand Up @@ -510,9 +517,9 @@ private List<URL> buildURLs(URL consumerURL, Collection<Instance> instances) {
return urls;
}

private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener)
private void subscribeEventListener(String serviceName, final URL url, final NacosAggregateListener listener)
throws NacosException {
ConcurrentMap<NotifyListener, EventListener> listeners = nacosListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
ConcurrentMap<NacosAggregateListener, EventListener> listeners = nacosListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
EventListener nacosListener = listeners.computeIfAbsent(listener, k -> {
EventListener eventListener = event -> {
if (event instanceof NamingEvent) {
Expand All @@ -528,7 +535,7 @@ private void subscribeEventListener(String serviceName, final URL url, final Not
instances = NacosInstanceManageUtil.getAllCorrespondingServiceInstanceList(serviceName);
}

notifySubscriber(url, listener, instances);
notifySubscriber(url, serviceName, listener, instances);
}
};
return eventListener;
Expand All @@ -538,14 +545,14 @@ private void subscribeEventListener(String serviceName, final URL url, final Not
nacosListener);
}

private void unsubscribeEventListener(String serviceName, final URL url, final NotifyListener listener)
private void unsubscribeEventListener(String serviceName, final URL url, final NacosAggregateListener listener)
throws NacosException {
ConcurrentMap<NotifyListener, EventListener> notifyListenerEventListenerConcurrentMap = nacosListeners.get(url);
if(notifyListenerEventListenerConcurrentMap == null){
ConcurrentMap<NacosAggregateListener, EventListener> notifyListenerEventListenerConcurrentMap = nacosListeners.get(url);
if (notifyListenerEventListenerConcurrentMap == null) {
return;
}
EventListener nacosListener = notifyListenerEventListenerConcurrentMap.get(listener);
if(nacosListener == null){
if (nacosListener == null) {
return;
}
namingService.unsubscribe(serviceName,
Expand All @@ -560,14 +567,14 @@ private void unsubscribeEventListener(String serviceName, final URL url, final N
* @param listener {@link NotifyListener}
* @param instances all {@link Instance instances}
*/
private void notifySubscriber(URL url, NotifyListener listener, Collection<Instance> instances) {
private void notifySubscriber(URL url, String serviceName, NacosAggregateListener listener, Collection<Instance> instances) {
List<Instance> enabledInstances = new LinkedList<>(instances);
if (enabledInstances.size() > 0) {
// Instances
filterEnabledInstances(enabledInstances);
}
List<URL> urls = toUrlWithEmpty(url, enabledInstances);
NacosRegistry.this.notify(url, listener, urls);
List<URL> aggregatedUrls = toUrlWithEmpty(url, listener.saveAndAggregateAllInstances(serviceName, enabledInstances));
NacosRegistry.this.notify(url, listener.getNotifyListener(), aggregatedUrls);
}

/**
Expand Down

0 comments on commit 8b580d0

Please sign in to comment.