Skip to content

Commit

Permalink
Merge pull request #3141, optimize outbound event and some code forma…
Browse files Browse the repository at this point in the history
…tting.
  • Loading branch information
carryxyh authored and chickenlj committed Jan 24, 2019
1 parent 6a87dc5 commit 9cdb2f0
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,7 @@
*/

public abstract class Proxy {
public static final InvocationHandler RETURN_NULL_INVOKER = new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) {
return null;
}
};
public static final InvocationHandler RETURN_NULL_INVOKER = (proxy, method, args) -> null;
public static final InvocationHandler THROW_UNSUPPORTED_INVOKER = new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) {
Expand Down Expand Up @@ -108,11 +103,7 @@ public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
// get cache by class loader.
Map<String, Object> cache;
synchronized (ProxyCacheMap) {
cache = ProxyCacheMap.get(cl);
if (cache == null) {
cache = new HashMap<String, Object>();
ProxyCacheMap.put(cl, cache);
}
cache = ProxyCacheMap.computeIfAbsent(cl, k -> new HashMap<>());
}

Proxy proxy = null;
Expand Down Expand Up @@ -145,8 +136,8 @@ public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
try {
ccp = ClassGenerator.newInstance(cl);

Set<String> worked = new HashSet<String>();
List<Method> methods = new ArrayList<Method>();
Set<String> worked = new HashSet<>();
List<Method> methods = new ArrayList<>();

for (int i = 0; i < ics.length; i++) {
if (!Modifier.isPublic(ics[i].getModifiers())) {
Expand Down Expand Up @@ -176,7 +167,7 @@ public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
for (int j = 0; j < pts.length; j++) {
code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
}
code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);");
code.append(" Object ret = handler.invoke(this, methods[").append(ix).append("], args);");
if (!Void.TYPE.equals(rt)) {
code.append(" return ").append(asArgument(rt, "ret")).append(";");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.common.utils.ConfigUtils;
import org.apache.dubbo.common.utils.NamedThreadFactory;
Expand Down Expand Up @@ -52,7 +53,6 @@

/**
* AbstractRegistry. (SPI, Prototype, ThreadSafe)
*
*/
public abstract class AbstractRegistry implements Registry {

Expand All @@ -62,16 +62,16 @@ public abstract class AbstractRegistry implements Registry {
private static final String URL_SPLIT = "\\s+";
// Log output
protected final Logger logger = LoggerFactory.getLogger(getClass());
// Local disk cache, where the special key value.registies records the list of registry centers, and the others are the list of notified service providers
// Local disk cache, where the special key value.registries records the list of registry centers, and the others are the list of notified service providers
private final Properties properties = new Properties();
// File cache timing writing
private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true));
// Is it synchronized to save the file
private final boolean syncSaveFile;
private final AtomicLong lastCacheChanged = new AtomicLong();
private final Set<URL> registered = new ConcurrentHashSet<URL>();
private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>();
private final Set<URL> registered = new ConcurrentHashSet<>();
private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<>();
private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<>();
private URL registryUrl;
// Local disk cache file
private File file;
Expand All @@ -91,13 +91,15 @@ public AbstractRegistry(URL url) {
}
}
this.file = file;
// When starting the subscription center,
// we need to read the local cache file for future Registry fault tolerance processing.
loadProperties();
notify(url.getBackupUrls());
}

protected static List<URL> filterEmpty(URL url, List<URL> urls) {
if (urls == null || urls.isEmpty()) {
List<URL> result = new ArrayList<URL>(1);
if (CollectionUtils.isEmpty(urls)) {
List<URL> result = new ArrayList<>(1);
result.add(url.setProtocol(Constants.EMPTY_PROTOCOL));
return result;
}
Expand Down Expand Up @@ -222,7 +224,7 @@ public List<URL> getCacheUrls(URL url) {
&& (Character.isLetter(key.charAt(0)) || key.charAt(0) == '_')
&& value != null && value.length() > 0) {
String[] arr = value.trim().split(URL_SPLIT);
List<URL> urls = new ArrayList<URL>();
List<URL> urls = new ArrayList<>();
for (String u : arr) {
urls.add(URL.valueOf(u));
}
Expand All @@ -234,7 +236,7 @@ public List<URL> getCacheUrls(URL url) {

@Override
public List<URL> lookup(URL url) {
List<URL> result = new ArrayList<URL>();
List<URL> result = new ArrayList<>();
Map<String, List<URL>> notifiedUrls = getNotified().get(url);
if (notifiedUrls != null && notifiedUrls.size() > 0) {
for (List<URL> urls : notifiedUrls.values()) {
Expand All @@ -245,7 +247,7 @@ public List<URL> lookup(URL url) {
}
}
} else {
final AtomicReference<List<URL>> reference = new AtomicReference<List<URL>>();
final AtomicReference<List<URL>> reference = new AtomicReference<>();
NotifyListener listener = reference::set;
subscribe(url, listener); // Subscribe logic guarantees the first notify to return
List<URL> urls = reference.get();
Expand Down Expand Up @@ -293,10 +295,7 @@ public void subscribe(URL url, NotifyListener listener) {
if (logger.isInfoEnabled()) {
logger.info("Subscribe: " + url);
}

Set<NotifyListener> listeners = subscribed.computeIfAbsent(url, k -> {
return new ConcurrentHashSet<>();
});
Set<NotifyListener> listeners = subscribed.computeIfAbsent(url, n -> new ConcurrentHashSet<>());
listeners.add(listener);
}

Expand All @@ -319,7 +318,7 @@ public void unsubscribe(URL url, NotifyListener listener) {

protected void recover() throws Exception {
// register
Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
Set<URL> recoverRegistered = new HashSet<>(getRegistered());
if (!recoverRegistered.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover register url " + recoverRegistered);
Expand All @@ -329,7 +328,7 @@ protected void recover() throws Exception {
}
}
// subscribe
Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<>(getSubscribed());
if (!recoverSubscribed.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover subscribe url " + recoverSubscribed.keySet());
Expand All @@ -344,7 +343,7 @@ protected void recover() throws Exception {
}

protected void notify(List<URL> urls) {
if (urls == null || urls.isEmpty()) {
if (CollectionUtils.isEmpty(urls)) {
return;
}

Expand All @@ -368,43 +367,49 @@ protected void notify(List<URL> urls) {
}
}

/**
* Notify changes from the Provider side.
*
* @param url consumer side url
* @param listener listener
* @param urls provider latest urls
*/
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if ((urls == null || urls.isEmpty())
if ((CollectionUtils.isEmpty(urls))
&& !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " + url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
}
Map<String, List<URL>> result = new HashMap<String, List<URL>>();
// keep every provider's category.
Map<String, List<URL>> result = new HashMap<>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
List<URL> categoryList = result.computeIfAbsent(category, k -> {
return new ArrayList<>();
});
List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, k -> {
return new ConcurrentHashMap<>();
});
Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
saveProperties(url);
listener.notify(categoryList);
// We will update our cache file after each notification.
// When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
saveProperties(url);
}
}

Expand Down Expand Up @@ -443,9 +448,9 @@ public void destroy() {
if (logger.isInfoEnabled()) {
logger.info("Destroy registry:" + getUrl());
}
Set<URL> destroyRegistered = new HashSet<URL>(getRegistered());
Set<URL> destroyRegistered = new HashSet<>(getRegistered());
if (!destroyRegistered.isEmpty()) {
for (URL url : new HashSet<URL>(getRegistered())) {
for (URL url : new HashSet<>(getRegistered())) {
if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
try {
unregister(url);
Expand All @@ -458,7 +463,7 @@ public void destroy() {
}
}
}
Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<>(getSubscribed());
if (!destroySubscribed.isEmpty()) {
for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
URL url = entry.getKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ public void unsubscribe(URL url, NotifyListener listener) {

@Override
public List<URL> lookup(URL url) {
List<URL> urls = new ArrayList<URL>();
List<URL> urls = new ArrayList<>();
Map<String, List<URL>> notifiedUrls = getNotified().get(url);
if (notifiedUrls != null && notifiedUrls.size() > 0) {
for (List<URL> values : notifiedUrls.values()) {
Expand Down
Loading

0 comments on commit 9cdb2f0

Please sign in to comment.