Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE-#3880] NamingService Client support pushEmptyProtection. #4665

Merged
merged 2 commits into from Jan 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/src/main/java/com/alibaba/nacos/api/PropertyKeyConst.java
Expand Up @@ -66,6 +66,8 @@ public class PropertyKeyConst {
public static final String NAMING_POLLING_THREAD_COUNT = "namingPollingThreadCount";

public static final String NAMING_REQUEST_DOMAIN_RETRY_COUNT = "namingRequestDomainMaxRetryCount";

public static final String NAMING_PUSH_EMPTY_PROTECTION = "namingPushEmptyProtection";

/**
* Get the key value of some variable value from the system property.
Expand Down
Expand Up @@ -167,6 +167,10 @@ public boolean validate() {
return true;
}

if (hosts == null) {
return false;
}

List<Instance> validHosts = new ArrayList<Instance>();
for (Instance host : hosts) {
if (!host.isHealthy()) {
Expand All @@ -177,8 +181,8 @@ public boolean validate() {
validHosts.add(host);
}
}

return true;
//No valid hosts, return false.
return !validHosts.isEmpty();
}

@JsonIgnore
Expand Down
Expand Up @@ -93,7 +93,7 @@ private void init(Properties properties) throws NacosException {
this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties);
this.beatReactor = new BeatReactor(this.serverProxy, initClientBeatThreadCount(properties));
this.hostReactor = new HostReactor(this.serverProxy, beatReactor, this.cacheDir, isLoadCacheAtStart(properties),
initPollingThreadCount(properties));
isPushEmptyProtect(properties), initPollingThreadCount(properties));
}

private int initClientBeatThreadCount(Properties properties) {
Expand Down Expand Up @@ -126,6 +126,16 @@ private boolean isLoadCacheAtStart(Properties properties) {
return loadCacheAtStart;
}

private boolean isPushEmptyProtect(Properties properties) {
boolean pushEmptyProtection = false;
if (properties != null && StringUtils
.isNotEmpty(properties.getProperty(PropertyKeyConst.NAMING_PUSH_EMPTY_PROTECTION))) {
pushEmptyProtection = ConvertUtils
.toBoolean(properties.getProperty(PropertyKeyConst.NAMING_PUSH_EMPTY_PROTECTION));
}
return pushEmptyProtection;
}

private void initServerAddr(Properties properties) {
serverList = properties.getProperty(PropertyKeyConst.SERVER_ADDR);
endpoint = InitUtils.initEndpoint(properties);
Expand Down
Expand Up @@ -78,16 +78,18 @@ public class HostReactor implements Closeable {

private final String cacheDir;

private final boolean pushEmptyProtection;

private final ScheduledExecutorService executor;

private final InstancesChangeNotifier notifier;

public HostReactor(NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir) {
this(serverProxy, beatReactor, cacheDir, false, UtilAndComs.DEFAULT_POLLING_THREAD_COUNT);
this(serverProxy, beatReactor, cacheDir, false, false, UtilAndComs.DEFAULT_POLLING_THREAD_COUNT);
}

public HostReactor(NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir, boolean loadCacheAtStart,
int pollingThreadCount) {
boolean pushEmptyProtection, int pollingThreadCount) {
// init executorService
this.executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {
@Override
Expand All @@ -107,12 +109,12 @@ public Thread newThread(Runnable r) {
} else {
this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16);
}

this.pushEmptyProtection = pushEmptyProtection;
this.updatingMap = new ConcurrentHashMap<String, Object>();
this.failoverReactor = new FailoverReactor(this, cacheDir);
this.pushReceiver = new PushReceiver(this);
this.notifier = new InstancesChangeNotifier();

NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
NotifyCenter.registerSubscriber(notifier);
}
Expand Down Expand Up @@ -161,7 +163,8 @@ public List<ServiceInfo> getSubscribeServices() {
public ServiceInfo processServiceJson(String json) {
ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
if (serviceInfo.getHosts() == null || !serviceInfo.validate()) {

if (pushEmptyProtection && !serviceInfo.validate()) {
horizonzy marked this conversation as resolved.
Show resolved Hide resolved
//empty or error push, just ignore
return oldService;
}
Expand Down