Skip to content

Commit

Permalink
[ISSUE-#4166] Optimize notifyCenter code (#4167)
Browse files Browse the repository at this point in the history
* enhance notify center code quality

* reverse the judge logic
  • Loading branch information
horizonzy committed Nov 9, 2020
1 parent fb61648 commit 48d0c68
Showing 1 changed file with 21 additions and 23 deletions.
Expand Up @@ -18,19 +18,19 @@

import com.alibaba.nacos.api.exception.runtime.NacosRuntimeException;
import com.alibaba.nacos.common.JustForTest;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.common.notify.listener.SmartSubscriber;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.common.utils.BiFunction;
import com.alibaba.nacos.common.utils.ClassUtils;
import com.alibaba.nacos.common.utils.MapUtils;
import com.alibaba.nacos.common.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Iterator;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.NoSuchElementException;
import java.util.Iterator;
import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -170,7 +170,6 @@ public static void shutdown() {
* @param <T> event type
*/
public static <T> void registerSubscriber(final Subscriber consumer) {
final Class<? extends Event> cls = consumer.subscribeType();
// If you want to listen to multiple events, you do it separately,
// based on subclass's subscribeTypes method return list, it can register to publisher.
if (consumer instanceof SmartSubscriber) {
Expand All @@ -186,16 +185,17 @@ public static <T> void registerSubscriber(final Subscriber consumer) {
return;
}

if (ClassUtils.isAssignableFrom(SlowEvent.class, cls)) {
INSTANCE.sharePublisher.addSubscriber(consumer, cls);
final Class<? extends Event> subscribeType = consumer.subscribeType();
if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
return;
}

addSubscriber(consumer, consumer.subscribeType());
addSubscriber(consumer, subscribeType);
}

/**
* Add a subscriber to pusblisher.
* Add a subscriber to publisher.
*
* @param consumer subscriber instance.
* @param subscribeType subscribeType.
Expand All @@ -217,7 +217,6 @@ private static void addSubscriber(final Subscriber consumer, Class<? extends Eve
* @param consumer subscriber instance.
*/
public static <T> void deregisterSubscriber(final Subscriber consumer) {
final Class<? extends Event> cls = consumer.subscribeType();
if (consumer instanceof SmartSubscriber) {
for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {
if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
Expand All @@ -229,15 +228,16 @@ public static <T> void deregisterSubscriber(final Subscriber consumer) {
return;
}

if (ClassUtils.isAssignableFrom(SlowEvent.class, cls)) {
INSTANCE.sharePublisher.removeSubscriber(consumer, cls);
final Class<? extends Event> subscribeType = consumer.subscribeType();
if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
INSTANCE.sharePublisher.removeSubscriber(consumer, subscribeType);
return;
}

if (removeSubscriber(consumer, consumer.subscribeType())) {
if (removeSubscriber(consumer, subscribeType)) {
return;
}
throw new NoSuchElementException("The subcriber has no event publisher");
throw new NoSuchElementException("The subscriber has no event publisher");
}

/**
Expand All @@ -250,12 +250,11 @@ public static <T> void deregisterSubscriber(final Subscriber consumer) {
private static boolean removeSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType) {

final String topic = ClassUtils.getCanonicalName(subscribeType);
if (INSTANCE.publisherMap.containsKey(topic)) {
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
publisher.removeSubscriber(consumer);
EventPublisher eventPublisher = INSTANCE.publisherMap.get(topic);
if (eventPublisher != null) {
eventPublisher.removeSubscriber(consumer);
return true;
}

return false;
}

Expand All @@ -281,16 +280,16 @@ public static boolean publishEvent(final Event event) {
* @param event event instance.
*/
private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
final String topic = ClassUtils.getCanonicalName(eventType);
if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
return INSTANCE.sharePublisher.publish(event);
}

if (INSTANCE.publisherMap.containsKey(topic)) {
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
final String topic = ClassUtils.getCanonicalName(eventType);

EventPublisher publisher = INSTANCE.publisherMap.get(topic);
if (publisher != null) {
return publisher.publish(event);
}

LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
return false;
}
Expand Down Expand Up @@ -321,8 +320,7 @@ public static EventPublisher registerToPublisher(final Class<? extends Event> ev
// MapUtils.computeIfAbsent is a unsafe method.
MapUtils.computeIfAbsent(INSTANCE.publisherMap, topic, publisherFactory, eventType, queueMaxSize);
}
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
return publisher;
return INSTANCE.publisherMap.get(topic);
}

/**
Expand Down

0 comments on commit 48d0c68

Please sign in to comment.