Skip to content

Commit

Permalink
#502 Fix serialize problem and do some refactoring for health check
Browse files Browse the repository at this point in the history
  • Loading branch information
nkorange committed Jan 27, 2019
1 parent e3440fe commit caf7213
Show file tree
Hide file tree
Showing 25 changed files with 93 additions and 127 deletions.
Expand Up @@ -50,6 +50,7 @@ public class Service {
/**
* Health check mode.
*/
@Deprecated
private String healthCheckMode;

/**
Expand Down
4 changes: 4 additions & 0 deletions console/src/main/resources/META-INF/nacos-default.properties
Expand Up @@ -64,3 +64,7 @@ server.tomcat.accesslog.enabled=true
server.tomcat.accesslog.pattern=%h %l %u %t "%r" %s %b %D
# default current work dir
server.tomcat.basedir=

nacos.naming.partition.taskDispatchThreadCount=10
nacos.naming.partition.taskDispatchPeriod=200
nacos.naming.partition.batchSyncKeyCount=1000
4 changes: 4 additions & 0 deletions distribution/conf/application.properties
Expand Up @@ -38,3 +38,7 @@ server.tomcat.basedir=
#nacos.security.ignore.urls=/**

nacos.security.ignore.urls=/,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.ico,/console-fe/public/**,/v1/auth/login,/v1/console/health,/v1/cs/**,/v1/ns/**,/v1/cmdb/**,/actuator/**

nacos.naming.partition.taskDispatchThreadCount=10
nacos.naming.partition.taskDispatchPeriod=200
nacos.naming.partition.batchSyncKeyCount=1000
Expand Up @@ -28,8 +28,10 @@
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.ObjectFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand All @@ -43,6 +45,7 @@
* @since 1.0.0
*/
@Component
@DependsOn("serverListManager")
public class DataSyncer implements MemberChangeListener {

@Autowired
Expand All @@ -61,7 +64,8 @@ public class DataSyncer implements MemberChangeListener {

private List<Member> servers;

public DataSyncer() {
@PostConstruct
public void init() {
serverListManager.listen(this);
startTimedSync();
}
Expand Down
Expand Up @@ -16,22 +16,24 @@
package com.alibaba.nacos.naming.consistency.ephemeral.partition;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
* Stores some configurations for Partition protocol
*
* @author nkorange
* @since 1.0.0
*/
@Component
public class PartitionConfig {

@Value("taskDispatchThreadCount")
@Value("${nacos.naming.partition.taskDispatchThreadCount}")
private int taskDispatchThreadCount = 10;

@Value("taskDispatchPeriod")
@Value("${nacos.naming.partition.taskDispatchPeriod}")
private int taskDispatchPeriod = 2000;

@Value("batchSyncKeyCount")
@Value("${nacos.naming.partition.batchSyncKeyCount}")
private int batchSyncKeyCount = 1000;

public int getTaskDispatchThreadCount() {
Expand Down
Expand Up @@ -135,25 +135,37 @@ public synchronized static Datum readDatum(File file, String namespaceId) throws
}

if (KeyBuilder.matchServiceMetaKey(file.getName())) {
return JSON.parseObject(json.replace("\\", ""), new TypeReference<Datum<Service>>() {
});
try {
return JSON.parseObject(json.replace("\\", ""), new TypeReference<Datum<Service>>() {
});
} catch (Exception e) {
Datum<String> datum = JSON.parseObject(json, new TypeReference<Datum<String>>(){});
Datum<Service> serviceDatum = new Datum<>();
serviceDatum.timestamp.set(datum.timestamp.get());
serviceDatum.key = datum.key;
serviceDatum.value = JSON.parseObject(datum.value, Service.class);
return serviceDatum;
}
}

if (KeyBuilder.matchInstanceListKey(file.getName())) {
Datum<List<Instance>> datum = JSON.parseObject(json, new TypeReference<Datum<List<Instance>>>() {
});
Map<String, Instance> instanceMap = new HashMap<>(64);
if (datum.value == null || datum.value.isEmpty()) {
return datum;
}
for (Instance instance : datum.value) {
instanceMap.put(instance.getDatumKey(), instance);
try {
return JSON.parseObject(json, new TypeReference<Datum<Instances>>() {
});
} catch (Exception e) {
Datum<String> datum = JSON.parseObject(json, new TypeReference<Datum<String>>(){});
List<Instance> instanceList = JSON.parseObject(datum.value, new TypeReference<List<Instance>>(){});
Datum<Instances> instancesDatum = new Datum<>();
instancesDatum.key = datum.key;
instancesDatum.timestamp.set(datum.timestamp.get());

Instances instances = new Instances();
instances.setInstanceMap(new HashMap<>(16));
for (Instance instance : instanceList) {
instances.getInstanceMap().put(instance.getDatumKey(), instance);
}
return instancesDatum;
}
Datum<Map<String, Instance>> mapDatum = new Datum<>();
mapDatum.value = instanceMap;
mapDatum.key = datum.key;
mapDatum.timestamp.set(datum.timestamp.get());
return mapDatum;
}

return JSON.parseObject(json, Datum.class);
Expand Down
Expand Up @@ -46,7 +46,7 @@
* @author nkorange
*/
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + UtilsAndCommons.NACOS_NAMING_CATALOG_CONTEXT)
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/catalog")
public class CatalogController {

@Autowired
Expand Down
Expand Up @@ -23,7 +23,7 @@
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.healthcheck.HealthCheckMode;
import com.alibaba.nacos.naming.healthcheck.HealthCheckType;
import com.alibaba.nacos.naming.misc.HttpClient;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
Expand Down Expand Up @@ -102,7 +102,7 @@ public String update(HttpServletRequest request) throws Exception {
} else {
Service service = serviceManager.getService(namespaceId, dom);
// Only health check "none" need update health status with api
if (service.getHealthCheckMode().equals(HealthCheckMode.none.name())) {
if (HealthCheckType.NONE.name().equals(service.getClusterMap().get(clusterName).getHealthChecker().getType())) {
for (Instance instance : service.allIPs(Lists.newArrayList(clusterName))) {
if (instance.getIp().equals(ip) && instance.getPort() == port) {
instance.setValid(valid);
Expand Down
Expand Up @@ -33,6 +33,7 @@
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.http.HttpServletRequest;
Expand All @@ -45,6 +46,7 @@
* @author nkorange
*/
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/operator")
public class OperatorController {

@Autowired
Expand Down Expand Up @@ -120,7 +122,7 @@ public String updateSwitch(HttpServletRequest request) throws Exception {
}


@RequestMapping("/metrics")
@RequestMapping(value = "/metrics", method = RequestMethod.GET)
public JSONObject metrics(HttpServletRequest request) {

JSONObject result = new JSONObject();
Expand Down
Expand Up @@ -78,7 +78,6 @@ public String create(HttpServletRequest request) throws Exception {
}

float protectThreshold = NumberUtils.toFloat(WebUtils.optional(request, "protectThreshold", "0"));
String healthCheckMode = WebUtils.optional(request, "healthCheckMode", switchDomain.defaultHealthCheckMode);
String metadata = WebUtils.optional(request, "metadata", StringUtils.EMPTY);
String selector = WebUtils.optional(request, "selector", StringUtils.EMPTY);
Map<String, String> metadataMap = new HashMap<>(16);
Expand All @@ -89,7 +88,6 @@ public String create(HttpServletRequest request) throws Exception {
Service domObj = new Service();
domObj.setName(serviceName);
domObj.setProtectThreshold(protectThreshold);
domObj.setHealthCheckMode(healthCheckMode.toLowerCase());
domObj.setEnabled(true);
domObj.setMetadata(metadataMap);
domObj.setSelector(parseSelector(selector));
Expand Down Expand Up @@ -142,7 +140,6 @@ public JSONObject detail(HttpServletRequest request) throws Exception {
res.put("name", serviceName);
res.put("namespaceId", domain.getNamespaceId());
res.put("protectThreshold", domain.getProtectThreshold());
res.put("healthCheckMode", domain.getHealthCheckMode());
res.put("metadata", domain.getMetadata());
res.put("selector", domain.getSelector());

Expand Down
Expand Up @@ -409,10 +409,6 @@ public void update(Service vDom) {
resetWeight = vDom.getResetWeight();
}

if (getHealthCheckMode().equals(vDom.getHealthCheckMode())) {
Loggers.SRV_LOG.info("[DOM-UPDATE] dom: {}, healthCheckMode: {} -> {}", getName(), getHealthCheckMode(), vDom.getHealthCheckMode());
}

if (enabled != vDom.getEnabled().booleanValue()) {
Loggers.SRV_LOG.info("[DOM-UPDATE] dom: {}, enabled: {} -> {}", getName(), enabled, vDom.getEnabled());
enabled = vDom.getEnabled();
Expand Down
Expand Up @@ -60,12 +60,11 @@ public String taskKey() {
@Override
public void run() {
try {
if (!domain.getHealthCheckMode().equals(HealthCheckMode.client.name()) ||
!getDistroMapper().responsible(domain.getName())) {
if (!getDistroMapper().responsible(domain.getName())) {
return;
}

List<Instance> instances = domain.allIPs();
List<Instance> instances = domain.allIPs(true);

for (Instance instance : instances) {
if (System.currentTimeMillis() - instance.getLastBeat() > ClientBeatProcessor.CLIENT_BEAT_TIMEOUT) {
Expand Down
Expand Up @@ -61,10 +61,6 @@ public String getType() {

public void process() {
Service service = this.service;
if (!service.getHealthCheckMode().equals(HealthCheckMode.client.name())) {
return;
}

Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());

String ip = rsInfo.getIp();
Expand Down
Expand Up @@ -108,10 +108,6 @@ public void run() {
}, 500, TimeUnit.MILLISECONDS);
}

public boolean isHealthCheckEnabled(Service service) {
return service.getHealthCheckMode().equals(HealthCheckMode.server.name());
}

public void reEvaluateCheckRT(long checkRT, HealthCheckTask task, SwitchDomain.HealthParams params) {
task.setCheckRTLast(checkRT);

Expand Down

This file was deleted.

Expand Up @@ -33,6 +33,9 @@ public class HealthCheckProcessorDelegate implements HealthCheckProcessor {
@Autowired
private MysqlHealthCheckProcessor mysqlProcessor;

@Autowired
private NoneHealthCheckProcessor noneProcessor;

@Override
public void process(HealthCheckTask task) {

Expand All @@ -53,7 +56,7 @@ public void process(HealthCheckTask task) {
return;
}

throw new IllegalArgumentException("Unknown check type: " + type);
noneProcessor.process(task);
}

@Override
Expand Down
Expand Up @@ -19,7 +19,6 @@
import com.alibaba.nacos.naming.boot.SpringContext;
import com.alibaba.nacos.naming.core.Cluster;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import org.apache.commons.lang3.RandomUtils;
Expand Down Expand Up @@ -70,7 +69,8 @@ public void initCheckRT() {
public void run() {

try {
if (distroMapper.responsible(cluster.getDom().getName())) {
if (distroMapper.responsible(cluster.getDom().getName()) &&
switchDomain.isHealthCheckEnabled(cluster.getDom().getName())) {
healthCheckProcessor.process(this);
Loggers.EVT_LOG.debug("[HEALTH-CHECK] schedule health check task: {}", cluster.getDom().getName());
}
Expand All @@ -92,12 +92,10 @@ public void run() {
this.setCheckRTLastLast(this.getCheckRTLast());

Cluster cluster = this.getCluster();
if ((cluster.getDom()).getHealthCheckMode().equals(HealthCheckMode.server.name())) {
Loggers.CHECK_RT.info("{}:{}@{}->normalized: {}, worst: {}, best: {}, last: {}, diff: {}",
cluster.getDom().getName(), cluster.getName(), cluster.getHealthChecker().getType(),
this.getCheckRTNormalized(), this.getCheckRTWorst(), this.getCheckRTBest(),
this.getCheckRTLast(), diff);
}
Loggers.CHECK_RT.info("{}:{}@{}->normalized: {}, worst: {}, best: {}, last: {}, diff: {}",
cluster.getDom().getName(), cluster.getName(), cluster.getHealthChecker().getType(),
this.getCheckRTNormalized(), this.getCheckRTWorst(), this.getCheckRTBest(),
this.getCheckRTLast(), diff);
}
}
}
Expand Down
Expand Up @@ -30,5 +30,9 @@ public enum HealthCheckType {
/**
* MySQL type
*/
MYSQL
MYSQL,
/**
* No check
*/
NONE
}
Expand Up @@ -90,9 +90,7 @@ public void process(HealthCheckTask task) {
return;
}

Service service = task.getCluster().getDom();

if (!switchDomain.isHealthCheckEnabled() || !service.getHealthCheckMode().equals(HealthCheckMode.server.name())) {
if (!switchDomain.isHealthCheckEnabled()) {
return;
}

Expand Down
Expand Up @@ -96,12 +96,6 @@ public void process(HealthCheckTask task) {
return;
}

Service service = task.getCluster().getDom();

if (!healthCheckCommon.isHealthCheckEnabled(service)) {
return;
}

for (Instance ip : ips) {
try {

Expand Down

0 comments on commit caf7213

Please sign in to comment.