Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Code review around AbstractConfiguratorListener #3077

Merged
merged 8 commits into from
Dec 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,11 @@ public Router getRouter(URL url) {
}
String rule = IOUtils.read(new FileReader(new File(url.getAbsolutePath())));

// FIXME: this code looks useless
boolean runtime = url.getParameter(Constants.RUNTIME_KEY, false);
URL script = url.setProtocol(protocol).addParameter(Constants.TYPE_KEY, type).addParameter(Constants.RUNTIME_KEY, runtime).addParameterAndEncoded(Constants.RULE_KEY, rule);
URL script = url.setProtocol(protocol).addParameter(Constants.TYPE_KEY, type)
.addParameter(Constants.RUNTIME_KEY, runtime)
.addParameterAndEncoded(Constants.RULE_KEY, rule);

return routerFactory.getRouter(script);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class ScriptRouter extends AbstractRouter {
public static final String NAME = "SCRIPT_ROUTER";
private static final Logger logger = LoggerFactory.getLogger(ScriptRouter.class);

private static final Map<String, ScriptEngine> engines = new ConcurrentHashMap<String, ScriptEngine>();
private static final Map<String, ScriptEngine> engines = new ConcurrentHashMap<>();

private final ScriptEngine engine;

Expand All @@ -62,13 +62,13 @@ public ScriptRouter(URL url) {
type = Constants.DEFAULT_SCRIPT_TYPE_KEY;
}
if (rule == null || rule.length() == 0) {
throw new IllegalStateException(new IllegalStateException("route rule can not be empty. rule:" + rule));
throw new IllegalStateException("route rule can not be empty. rule:" + rule);
}
ScriptEngine engine = engines.get(type);
if (engine == null) {
engine = new ScriptEngineManager().getEngineByName(type);
if (engine == null) {
throw new IllegalStateException(new IllegalStateException("Unsupported route rule type: " + type + ", rule: " + rule));
throw new IllegalStateException("unsupported route rule type: " + type + ", rule: " + rule);
}
engines.put(type, engine);
}
Expand All @@ -85,7 +85,7 @@ public URL getUrl() {
@SuppressWarnings("unchecked")
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
try {
List<Invoker<T>> invokersCopy = new ArrayList<Invoker<T>>(invokers);
List<Invoker<T>> invokersCopy = new ArrayList<>(invokers);
Compilable compilable = (Compilable) engine;
Bindings bindings = engine.createBindings();
bindings.put("invokers", invokersCopy);
Expand All @@ -105,8 +105,8 @@ public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation
}
return invokersCopy;
} catch (ScriptException e) {
//fail then ignore rule .invokers.
logger.error("route error , rule has been ignored. rule: " + rule + ", method:" + invocation.getMethodName() + ", url: " + RpcContext.getContext().getUrl(), e);
logger.error("route error, rule has been ignored. rule: " + rule + ", method:" +
invocation.getMethodName() + ", url: " + RpcContext.getContext().getUrl(), e);
return invokers;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import static org.apache.dubbo.common.Constants.TAG_KEY;

/**
*
* TagRouter
*/
public class TagRouter extends AbstractRouter implements Comparable<Router>, ConfigurationListener {
public static final String NAME = "TAG_ROUTER";
Expand Down Expand Up @@ -117,10 +117,9 @@ public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation
}
// FAILOVER: return all Providers without any tags.
else {
List<Invoker<T>> tmp = filterInvoker(invokers, invoker -> addressNotMatches(invoker.getUrl(), tagRouterRule
.getAddresses()));
return filterInvoker(tmp, invoker -> StringUtils.isEmpty(invoker.getUrl()
.getParameter(TAG_KEY)));
List<Invoker<T>> tmp = filterInvoker(invokers, invoker -> addressNotMatches(invoker.getUrl(),
tagRouterRule.getAddresses()));
return filterInvoker(tmp, invoker -> StringUtils.isEmpty(invoker.getUrl().getParameter(TAG_KEY)));
}
} else {
// List<String> addresses = tagRouterRule.filter(providerApp);
Expand All @@ -137,10 +136,7 @@ public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation
}
return filterInvoker(result, invoker -> {
String localTag = invoker.getUrl().getParameter(TAG_KEY);
if (StringUtils.isEmpty(localTag) || !tagRouterRule.getTagNames().contains(localTag)) {
return true;
}
return false;
return StringUtils.isEmpty(localTag) || !tagRouterRule.getTagNames().contains(localTag);
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,40 @@

import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.configcenter.ConfigChangeEvent;
import org.apache.dubbo.configcenter.ConfigChangeType;
import org.apache.dubbo.configcenter.ConfigurationListener;
import org.apache.dubbo.configcenter.DynamicConfiguration;
import org.apache.dubbo.rpc.cluster.Configurator;
import org.apache.dubbo.rpc.cluster.configurator.parser.ConfigParser;

import java.util.LinkedList;
import java.util.Collections;
import java.util.List;

/**
*
* AbstractConfiguratorListener
*/
public abstract class AbstractConfiguratorListener implements ConfigurationListener {
private static final Logger logger = LoggerFactory.getLogger(AbstractConfiguratorListener.class);

protected List<Configurator> configurators = new LinkedList<>();
protected List<Configurator> configurators = Collections.emptyList();


protected final void initWith(String key) {
DynamicConfiguration dynamicConfiguration = DynamicConfiguration.getDynamicConfiguration();
dynamicConfiguration.addListener(key, this);
String rawConfig = dynamicConfiguration.getConfig(key);
if (!StringUtils.isEmpty(rawConfig)) {
process(new ConfigChangeEvent(key, rawConfig));
}
}

@Override
public void process(ConfigChangeEvent event) {
if (logger.isInfoEnabled()) {
logger.info("Notification of overriding rule, change type is: " + event.getChangeType() + ", raw config content is:\n " + event
.getValue());
logger.info("Notification of overriding rule, change type is: " + event.getChangeType() +
", raw config content is:\n " + event.getValue());
}

if (event.getChangeType().equals(ConfigChangeType.DELETED)) {
Expand All @@ -50,8 +62,8 @@ public void process(ConfigChangeEvent event) {
configurators = Configurator.toConfigurators(ConfigParser.parseConfigurators(event.getValue()))
.orElse(configurators);
} catch (Exception e) {
logger.error("Failed to parse raw dynamic config and it will not take effect, the raw config is: " + event
.getValue(), e);
logger.error("Failed to parse raw dynamic config and it will not take effect, the raw config is: " +
event.getValue(), e);
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.Assert;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.configcenter.ConfigChangeEvent;
import org.apache.dubbo.configcenter.DynamicConfiguration;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.Registry;
Expand Down Expand Up @@ -157,7 +157,7 @@ public void setRegistry(Registry registry) {
public void subscribe(URL url) {
setConsumerUrl(url);
consumerConfigurationListener.addNotifyListener(this);
serviceConfigurationListener = new ReferenceConfigurationListener(url);
serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
registry.subscribe(url, this);
}

Expand Down Expand Up @@ -223,7 +223,9 @@ public void refreshOverrideAndInvoker(List<URL> urls) {
*/
// TODO: 2017/8/31 FIXME The thread pool should be used to refresh the address, otherwise the task may be accumulated.
private void refreshInvoker(List<URL> invokerUrls) {
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null && Constants.EMPTY_PROTOCOL.equals(invokerUrls
Assert.notNull(invokerUrls, "invokerUrls should not be null");

if (invokerUrls.size() == 1 && invokerUrls.get(0) != null && Constants.EMPTY_PROTOCOL.equals(invokerUrls
.get(0)
.getProtocol())) {
this.forbidden = true; // Forbid to access
Expand All @@ -233,7 +235,7 @@ private void refreshInvoker(List<URL> invokerUrls) {
} else {
this.forbidden = false; // Allow to access
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls == null) {
if (invokerUrls == Collections.<URL>emptyList()) {
invokerUrls = new ArrayList<>();
}
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
Expand Down Expand Up @@ -753,54 +755,37 @@ public URL getProviderUrl() {
}
}

public class ReferenceConfigurationListener extends AbstractConfiguratorListener {
private static class ReferenceConfigurationListener extends AbstractConfiguratorListener {
private RegistryDirectory directory;
private URL url;

ReferenceConfigurationListener(URL url) {
ReferenceConfigurationListener(RegistryDirectory directory, URL url) {
this.directory = directory;
this.url = url;
this.init();
}

private synchronized void init() {
String key = url.getEncodedServiceKey() + Constants.CONFIGURATORS_SUFFIX;
DynamicConfiguration.getDynamicConfiguration().addListener(key, this);
String rawConfig = DynamicConfiguration.getDynamicConfiguration().getConfig(key);
if (rawConfig != null) {
this.process(new ConfigChangeEvent(key, rawConfig));
}
this.initWith(url.getEncodedServiceKey() + Constants.CONFIGURATORS_SUFFIX);
}

@Override
protected void notifyOverrides() {
// 'null' means notification of configurators or routers.
RegistryDirectory.this.refreshInvoker(null);
// to notify configurator/router changes
directory.refreshInvoker(Collections.emptyList());
}
}

private static class ConsumerConfigurationListener extends AbstractConfiguratorListener {
List<RegistryDirectory> listeners = new ArrayList<>();


void addNotifyListener(RegistryDirectory listener) {
this.listeners.add(listener);
}

ConsumerConfigurationListener() {
this.init();
this.initWith(ApplicationModel.getApplication() + Constants.CONFIGURATORS_SUFFIX);
}

private synchronized void init() {
String appKey = ApplicationModel.getApplication() + Constants.CONFIGURATORS_SUFFIX;
DynamicConfiguration.getDynamicConfiguration().addListener(appKey, this);
String appRawConfig = DynamicConfiguration.getDynamicConfiguration().getConfig(appKey);
if (appRawConfig != null) {
process(new ConfigChangeEvent(appKey, appRawConfig));
}
void addNotifyListener(RegistryDirectory listener) {
this.listeners.add(listener);
}

@Override
protected void notifyOverrides() {
listeners.forEach(listener -> listener.refreshInvoker(null));
listeners.forEach(listener -> listener.refreshInvoker(Collections.emptyList()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.Cluster;
import org.apache.dubbo.rpc.cluster.Configurator;
import org.apache.dubbo.rpc.cluster.configurator.parser.ConfigParser;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.protocol.InvokerWrapper;

Expand Down Expand Up @@ -537,18 +536,7 @@ private class ServiceConfigurationListener extends AbstractConfiguratorListener
public ServiceConfigurationListener(URL providerUrl, OverrideListener notifyListener) {
this.providerUrl = providerUrl;
this.notifyListener = notifyListener;
this.init();
}

private synchronized void init() {
DynamicConfiguration dynamicConfiguration = DynamicConfiguration.getDynamicConfiguration();
String key = providerUrl.getEncodedServiceKey() + Constants.CONFIGURATORS_SUFFIX;
dynamicConfiguration.addListener(key, this);
String rawConfig = dynamicConfiguration.getConfig(key);
if (!StringUtils.isEmpty(rawConfig)) {
configurators = Configurator.toConfigurators(ConfigParser.parseConfigurators(rawConfig))
.orElse(configurators);
}
this.initWith(providerUrl.getEncodedServiceKey() + Constants.CONFIGURATORS_SUFFIX);
}

private <T> URL overrideUrl(URL providerUrl) {
Expand All @@ -564,18 +552,7 @@ protected void notifyOverrides() {
private class ProviderConfigurationListener extends AbstractConfiguratorListener {

public ProviderConfigurationListener() {
this.init();
}

private synchronized void init() {
DynamicConfiguration dynamicConfiguration = DynamicConfiguration.getDynamicConfiguration();
String appKey = ApplicationModel.getApplication() + Constants.CONFIGURATORS_SUFFIX;
dynamicConfiguration.addListener(appKey, this);
String appRawConfig = dynamicConfiguration.getConfig(appKey);
if (!StringUtils.isEmpty(appRawConfig)) {
configurators = Configurator.toConfigurators(ConfigParser.parseConfigurators(appRawConfig))
.orElse(configurators);
}
this.initWith(ApplicationModel.getApplication() + Constants.CONFIGURATORS_SUFFIX);
}

/**
Expand Down