Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize outbound event and some format #3141

Merged
merged 8 commits into from
Jan 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,7 @@
*/

public abstract class Proxy {
public static final InvocationHandler RETURN_NULL_INVOKER = new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) {
return null;
}
};
public static final InvocationHandler RETURN_NULL_INVOKER = (proxy, method, args) -> null;
public static final InvocationHandler THROW_UNSUPPORTED_INVOKER = new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) {
Expand Down Expand Up @@ -107,11 +102,7 @@ public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
// get cache by class loader.
Map<String, Object> cache;
synchronized (ProxyCacheMap) {
cache = ProxyCacheMap.get(cl);
if (cache == null) {
cache = new HashMap<String, Object>();
ProxyCacheMap.put(cl, cache);
}
cache = ProxyCacheMap.computeIfAbsent(cl, k -> new HashMap<>());
}

Proxy proxy = null;
Expand Down Expand Up @@ -144,8 +135,8 @@ public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
try {
ccp = ClassGenerator.newInstance(cl);

Set<String> worked = new HashSet<String>();
List<Method> methods = new ArrayList<Method>();
Set<String> worked = new HashSet<>();
List<Method> methods = new ArrayList<>();

for (int i = 0; i < ics.length; i++) {
if (!Modifier.isPublic(ics[i].getModifiers())) {
Expand Down Expand Up @@ -175,7 +166,7 @@ public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
for (int j = 0; j < pts.length; j++) {
code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
}
code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);");
code.append(" Object ret = handler.invoke(this, methods[").append(ix).append("], args);");
if (!Void.TYPE.equals(rt)) {
code.append(" return ").append(asArgument(rt, "ret")).append(";");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.common.utils.ConfigUtils;
import org.apache.dubbo.common.utils.NamedThreadFactory;
Expand Down Expand Up @@ -51,7 +52,6 @@

/**
* AbstractRegistry. (SPI, Prototype, ThreadSafe)
*
*/
public abstract class AbstractRegistry implements Registry {

Expand All @@ -61,16 +61,16 @@ public abstract class AbstractRegistry implements Registry {
private static final String URL_SPLIT = "\\s+";
// Log output
protected final Logger logger = LoggerFactory.getLogger(getClass());
// Local disk cache, where the special key value.registies records the list of registry centers, and the others are the list of notified service providers
// Local disk cache, where the special key value.registries records the list of registry centers, and the others are the list of notified service providers
private final Properties properties = new Properties();
// File cache timing writing
private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true));
// Is it synchronized to save the file
private final boolean syncSaveFile;
private final AtomicLong lastCacheChanged = new AtomicLong();
private final Set<URL> registered = new ConcurrentHashSet<URL>();
private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>();
private final Set<URL> registered = new ConcurrentHashSet<>();
private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<>();
private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<>();
private URL registryUrl;
// Local disk cache file
private File file;
Expand All @@ -90,13 +90,15 @@ public AbstractRegistry(URL url) {
}
}
this.file = file;
// When starting the subscription center,
// we need to read the local cache file for future Registry fault tolerance processing.
loadProperties();
notify(url.getBackupUrls());
}

protected static List<URL> filterEmpty(URL url, List<URL> urls) {
if (urls == null || urls.isEmpty()) {
List<URL> result = new ArrayList<URL>(1);
if (CollectionUtils.isEmpty(urls)) {
List<URL> result = new ArrayList<>(1);
result.add(url.setProtocol(Constants.EMPTY_PROTOCOL));
return result;
}
Expand Down Expand Up @@ -221,7 +223,7 @@ public List<URL> getCacheUrls(URL url) {
&& (Character.isLetter(key.charAt(0)) || key.charAt(0) == '_')
&& value != null && value.length() > 0) {
String[] arr = value.trim().split(URL_SPLIT);
List<URL> urls = new ArrayList<URL>();
List<URL> urls = new ArrayList<>();
for (String u : arr) {
urls.add(URL.valueOf(u));
}
Expand All @@ -233,7 +235,7 @@ public List<URL> getCacheUrls(URL url) {

@Override
public List<URL> lookup(URL url) {
List<URL> result = new ArrayList<URL>();
List<URL> result = new ArrayList<>();
Map<String, List<URL>> notifiedUrls = getNotified().get(url);
if (notifiedUrls != null && notifiedUrls.size() > 0) {
for (List<URL> urls : notifiedUrls.values()) {
Expand All @@ -244,11 +246,11 @@ public List<URL> lookup(URL url) {
}
}
} else {
final AtomicReference<List<URL>> reference = new AtomicReference<List<URL>>();
final AtomicReference<List<URL>> reference = new AtomicReference<>();
NotifyListener listener = reference::set;
subscribe(url, listener); // Subscribe logic guarantees the first notify to return
List<URL> urls = reference.get();
if (urls != null && !urls.isEmpty()) {
if (CollectionUtils.isNotEmpty(urls)) {
for (URL u : urls) {
if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) {
result.add(u);
Expand Down Expand Up @@ -292,10 +294,7 @@ public void subscribe(URL url, NotifyListener listener) {
if (logger.isInfoEnabled()) {
logger.info("Subscribe: " + url);
}

Set<NotifyListener> listeners = subscribed.computeIfAbsent(url, k -> {
return new ConcurrentHashSet<>();
});
Set<NotifyListener> listeners = subscribed.computeIfAbsent(url, n -> new ConcurrentHashSet<>());
listeners.add(listener);
}

Expand All @@ -318,7 +317,7 @@ public void unsubscribe(URL url, NotifyListener listener) {

protected void recover() throws Exception {
// register
Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
Set<URL> recoverRegistered = new HashSet<>(getRegistered());
if (!recoverRegistered.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover register url " + recoverRegistered);
Expand All @@ -328,7 +327,7 @@ protected void recover() throws Exception {
}
}
// subscribe
Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<>(getSubscribed());
if (!recoverSubscribed.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover subscribe url " + recoverSubscribed.keySet());
Expand All @@ -343,7 +342,7 @@ protected void recover() throws Exception {
}

protected void notify(List<URL> urls) {
if (urls == null || urls.isEmpty()) {
if (CollectionUtils.isEmpty(urls)) {
return;
}

Expand All @@ -367,43 +366,49 @@ protected void notify(List<URL> urls) {
}
}

/**
* Notify changes from the Provider side.
*
* @param url consumer side url
* @param listener listener
* @param urls provider latest urls
*/
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if ((urls == null || urls.isEmpty())
if ((CollectionUtils.isEmpty(urls))
&& !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " + url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
}
Map<String, List<URL>> result = new HashMap<String, List<URL>>();
// keep every provider's category.
Map<String, List<URL>> result = new HashMap<>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
List<URL> categoryList = result.computeIfAbsent(category, k -> {
return new ArrayList<>();
});
List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, k -> {
return new ConcurrentHashMap<>();
});
Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
saveProperties(url);
listener.notify(categoryList);
// We will update our cache file after each notification.
// When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
saveProperties(url);
}
}

Expand Down Expand Up @@ -442,9 +447,9 @@ public void destroy() {
if (logger.isInfoEnabled()) {
logger.info("Destroy registry:" + getUrl());
}
Set<URL> destroyRegistered = new HashSet<URL>(getRegistered());
Set<URL> destroyRegistered = new HashSet<>(getRegistered());
if (!destroyRegistered.isEmpty()) {
for (URL url : new HashSet<URL>(getRegistered())) {
for (URL url : new HashSet<>(getRegistered())) {
if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
try {
unregister(url);
Expand All @@ -457,7 +462,7 @@ public void destroy() {
}
}
}
Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<>(getSubscribed());
if (!destroySubscribed.isEmpty()) {
for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
URL url = entry.getKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.timer.HashedWheelTimer;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.retry.FailedNotifiedTask;
Expand Down Expand Up @@ -291,7 +292,7 @@ public void subscribe(URL url, NotifyListener listener) {
Throwable t = e;

List<URL> urls = getCacheUrls(url);
if (urls != null && !urls.isEmpty()) {
if (CollectionUtils.isNotEmpty(urls)) {
notify(url, listener, urls);
logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ public void unsubscribe(URL url, NotifyListener listener) {

@Override
public List<URL> lookup(URL url) {
List<URL> urls = new ArrayList<URL>();
List<URL> urls = new ArrayList<>();
Map<String, List<URL>> notifiedUrls = getNotified().get(url);
if (notifiedUrls != null && notifiedUrls.size() > 0) {
for (List<URL> values : notifiedUrls.values()) {
Expand Down
Loading