Skip to content

Commit

Permalink
#502 Fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
dungu.zpf committed Apr 9, 2019
1 parent 04797de commit f3b1370
Show file tree
Hide file tree
Showing 12 changed files with 71 additions and 6 deletions.
Expand Up @@ -187,6 +187,8 @@ public void onRemove(String key) {
return;
}

listeners.remove(key);

notifier.addTask(key, ApplyAction.DELETE);
}

Expand Down
Expand Up @@ -48,6 +48,13 @@ public class TaskDispatcher {

@PostConstruct
public void init() {

if (partitionConfig.getTaskDispatchThreadCount() > Runtime.getRuntime().availableProcessors()) {
Loggers.EPHEMERAL.error("should not larger than {}, current is: {}",
Runtime.getRuntime().availableProcessors(), partitionConfig.getTaskDispatchThreadCount());
throw new RuntimeException("task dispatch thread count is too large!");
}

for (int i = 0; i < partitionConfig.getTaskDispatchThreadCount(); i++) {
TaskScheduler taskScheduler = new TaskScheduler(i);
taskSchedulerList.add(taskScheduler);
Expand Down
Expand Up @@ -62,9 +62,11 @@ public void remove(String key) throws NacosException {
Datum datum = new Datum();
datum.key = key;
raftCore.onDelete(datum.key, peers.getLeader());
raftCore.unlistenAll(key);
return;
}
raftCore.signalDelete(key);
raftCore.unlistenAll(key);
} catch (Exception e) {
Loggers.RAFT.error("Raft remove failed.", e);
throw new NacosException(NacosException.SERVER_ERROR, "Raft remove failed, key:" + key);
Expand Down
Expand Up @@ -809,6 +809,10 @@ public void unlisten(String key, RecordListener listener) {
}
}

public void unlistenAll(String key) {
listeners.remove(key);
}

public void setTerm(long term) {
peers.setTerm(term);
}
Expand Down
Expand Up @@ -121,6 +121,7 @@ public String deregister(HttpServletRequest request) throws Exception {
return "ok";
}

@CanDistro
@RequestMapping(value = "", method = RequestMethod.PUT)
public String update(HttpServletRequest request) throws Exception {
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
Expand Down
Expand Up @@ -20,6 +20,7 @@
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.pojo.Record;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;

import java.math.BigInteger;
import java.nio.charset.Charset;
Expand All @@ -28,6 +29,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
* Package of instance list
Expand Down Expand Up @@ -72,7 +74,7 @@ private void recalculateChecksum() {
Collections.sort(instanceList);
for (Instance ip : instanceList) {
String string = ip.getIp() + ":" + ip.getPort() + "_" + ip.getWeight() + "_"
+ ip.isHealthy() + "_" + ip.getClusterName();
+ ip.isHealthy() + "_" + ip.isEnabled() + "_" + ip.getClusterName() + "_" + convertMap2String(ip.getMetadata());
sb.append(string);
sb.append(",");
}
Expand All @@ -87,4 +89,22 @@ private void recalculateChecksum() {
}
lastCalculateTime = System.currentTimeMillis();
}

public String convertMap2String(Map<String, String> map) {

if (map == null || map.isEmpty()) {
return StringUtils.EMPTY;
}

StringBuilder sb = new StringBuilder();
List<String> keys = new ArrayList<>(map.keySet());
Collections.sort(keys);
for (String key : keys) {
sb.append(key);
sb.append(":");
sb.append(map.get(key));
sb.append(",");
}
return sb.toString();
}
}
Expand Up @@ -168,7 +168,9 @@ public void onDelete(String key) throws Exception {
if (service != null) {
service.destroy();
consistencyService.remove(KeyBuilder.buildInstanceListKey(namespace, name, true));

consistencyService.remove(KeyBuilder.buildInstanceListKey(namespace, name, false));

consistencyService.unlisten(KeyBuilder.buildServiceMetaKey(namespace, name), service);
Loggers.SRV_LOG.info("[DEAD-SERVICE] {}", service.toJSON());
}
Expand Down
Expand Up @@ -72,7 +72,9 @@ public void run() {
if (distroMapper.responsible(cluster.getService().getName()) &&
switchDomain.isHealthCheckEnabled(cluster.getService().getName())) {
healthCheckProcessor.process(this);
Loggers.EVT_LOG.debug("[HEALTH-CHECK] schedule health check task: {}", cluster.getService().getName());
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[HEALTH-CHECK] schedule health check task: {}", cluster.getService().getName());
}
}
} catch (Throwable e) {
Loggers.SRV_LOG.error("[HEALTH-CHECK] error while process health check for {}:{}",
Expand Down
Expand Up @@ -482,6 +482,15 @@ public static String encodingParams(Map<String, String> params, String encoding)
return sb.toString();
}

public static Map<String, String> translateParameterMap(Map<String, String[]> parameterMap) {

Map<String, String> map = new HashMap<>(16);
for (String key : parameterMap.keySet()) {
map.put(key, parameterMap.get(key)[0]);
}
return map;
}

public static class HttpResult {
final public int code;
final public String content;
Expand Down
Expand Up @@ -236,4 +236,12 @@ public String toUrl() {
return sb.toString();
}
}

public static void main(String[] args) throws Exception {

String key = "com.alibaba.nacos.naming.iplist.ephemeral.public##DEFAULT_GROUP@@test.10";
List<String> keys = new ArrayList<>();
keys.add(key);
getData(keys, "11.239.112.161:8848");
}
}
Expand Up @@ -32,7 +32,10 @@
import java.lang.reflect.Method;
import java.net.URI;
import java.security.AccessControlException;
import java.util.*;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;

/**
* @author nacos
Expand Down Expand Up @@ -61,7 +64,11 @@ public void doFilter(ServletRequest servletRequest, ServletResponse servletRespo
HttpServletRequest req = (HttpServletRequest) servletRequest;
HttpServletResponse resp = (HttpServletResponse) servletResponse;

String urlString = req.getRequestURI() + "?" + req.getQueryString();
String urlString = req.getRequestURI();

if (StringUtils.isNotBlank(req.getQueryString())) {
urlString += "?" + req.getQueryString();
}

try {
String path = new URI(req.getRequestURI()).getPath();
Expand Down Expand Up @@ -98,7 +105,8 @@ public void doFilter(ServletRequest servletRequest, ServletResponse servletRespo
headerList.add(req.getHeader(headerName));
}
HttpClient.HttpResult result =
HttpClient.request("http://" + distroMapper.mapSrv(groupedServiceName) + urlString, headerList, new HashMap<>(2)
HttpClient.request("http://" + distroMapper.mapSrv(groupedServiceName) + urlString, headerList,
StringUtils.isBlank(req.getQueryString()) ? HttpClient.translateParameterMap(req.getParameterMap()) : new HashMap<>(2)
, PROXY_CONNECT_TIMEOUT, PROXY_READ_TIMEOUT, "UTF-8", req.getMethod());

try {
Expand Down
2 changes: 1 addition & 1 deletion test/src/test/resources/application.properties
Expand Up @@ -19,7 +19,7 @@ server.tomcat.accesslog.pattern=%h %l %u %t "%r" %s %b %D
# default current work dir
server.tomcat.basedir=

nacos.naming.distro.taskDispatchThreadCount=10
nacos.naming.distro.taskDispatchThreadCount=1
nacos.naming.distro.taskDispatchPeriod=200
nacos.naming.distro.batchSyncKeyCount=1000
nacos.naming.distro.initDataRatio=0.9
Expand Down

0 comments on commit f3b1370

Please sign in to comment.