Skip to content

Commit

Permalink
Fix MappingListener override in ServiceDiscoveryRegistry (#13277)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlbumenJ committed Nov 1, 2023
1 parent 6b634b7 commit 06e9410
Showing 1 changed file with 30 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.ConcurrentHashMapUtils;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.metadata.AbstractServiceNameMapping;
import org.apache.dubbo.metadata.MappingChangedEvent;
import org.apache.dubbo.metadata.MappingListener;
Expand All @@ -37,11 +38,13 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER_SIDE;
Expand Down Expand Up @@ -79,7 +82,7 @@ public class ServiceDiscoveryRegistry extends FailbackRegistry {

/* apps - listener */
private final Map<String, ServiceInstancesChangedListener> serviceListeners = new ConcurrentHashMap<>();
private final Map<String, MappingListener> mappingListeners = new ConcurrentHashMap<>();
private final Map<String, Set<MappingListener>> mappingListeners = new ConcurrentHashMap<>();
/* This lock has the same scope and lifecycle as its corresponding instance listener.
It's used to make sure that only one interface mapping to the same app list can do subscribe or unsubscribe at the same moment.
And the lock should be destroyed when listener destroying its corresponding instance listener.
Expand Down Expand Up @@ -112,7 +115,7 @@ public ServiceDiscovery getServiceDiscovery() {
*/
protected ServiceDiscovery createServiceDiscovery(URL registryURL) {
return getServiceDiscovery(registryURL.addParameter(INTERFACE_KEY, ServiceDiscovery.class.getName())
.removeParameter(REGISTRY_TYPE_KEY));
.removeParameter(REGISTRY_TYPE_KEY));
}

/**
Expand Down Expand Up @@ -206,7 +209,10 @@ public void doSubscribe(URL url, NotifyListener listener) {
try {
MappingListener mappingListener = new DefaultMappingListener(url, mappingByUrl, listener);
mappingByUrl = serviceNameMapping.getAndListen(this.getUrl(), url, mappingListener);
mappingListeners.put(url.getProtocolServiceKey(), mappingListener);
synchronized (mappingListeners) {
mappingListeners.computeIfAbsent(url.getProtocolServiceKey(), (k) -> new ConcurrentHashSet<>())
.add(mappingListener);
}
} catch (Exception e) {
logger.warn(INTERNAL_ERROR, "", "", "Cannot find app mapping for service " + url.getServiceInterface() + ", will not migrate.", e);
}
Expand Down Expand Up @@ -248,8 +254,23 @@ public void doUnsubscribe(URL url, NotifyListener listener) {
serviceDiscovery.unsubscribe(url, listener);
String protocolServiceKey = url.getProtocolServiceKey();
Set<String> serviceNames = serviceNameMapping.getMapping(url);
if (mappingListeners.get(protocolServiceKey) != null) {
serviceNameMapping.stopListen(url, mappingListeners.remove(protocolServiceKey));

synchronized (mappingListeners) {
Set<MappingListener> keyedListeners = mappingListeners.get(protocolServiceKey);
if (keyedListeners != null) {
List<MappingListener> matched = keyedListeners.stream()
.filter(mappingListener ->
mappingListener instanceof DefaultMappingListener
&& (Objects.equals(((DefaultMappingListener) mappingListener).getListener(), listener)))
.collect(Collectors.toList());
for (MappingListener mappingListener : matched) {
serviceNameMapping.stopListen(url, mappingListener);
keyedListeners.remove(mappingListener);
}
if (keyedListeners.isEmpty()) {
mappingListeners.remove(protocolServiceKey, Collections.emptySet());
}
}
}
if (CollectionUtils.isNotEmpty(serviceNames)) {
String serviceNamesKey = toStringKeys(serviceNames);
Expand Down Expand Up @@ -430,6 +451,10 @@ public synchronized void onEvent(MappingChangedEvent event) {
}
}

protected NotifyListener getListener() {
return listener;
}

@Override
public void stop() {
stopped = true;
Expand Down

0 comments on commit 06e9410

Please sign in to comment.