Skip to content

Commit

Permalink
#502 Fix several bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
nkorange committed Mar 18, 2019
1 parent 3f2d704 commit 52d4e01
Show file tree
Hide file tree
Showing 18 changed files with 104 additions and 29 deletions.
2 changes: 1 addition & 1 deletion api/pom.xml
Expand Up @@ -16,7 +16,7 @@
<parent>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-all</artifactId>
<version>1.0.0-RC1</version>
<version>1.0.0-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion client/pom.xml
Expand Up @@ -16,7 +16,7 @@
<parent>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-all</artifactId>
<version>1.0.0-RC1</version>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion cmdb/pom.xml
Expand Up @@ -18,7 +18,7 @@
<parent>
<artifactId>nacos-all</artifactId>
<groupId>com.alibaba.nacos</groupId>
<version>1.0.0-RC1</version>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion common/pom.xml
Expand Up @@ -18,7 +18,7 @@
<parent>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-all</artifactId>
<version>1.0.0-RC1</version>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion config/pom.xml
Expand Up @@ -17,7 +17,7 @@
<parent>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-all</artifactId>
<version>1.0.0-RC1</version>
<version>1.0.0-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion console/pom.xml
Expand Up @@ -18,7 +18,7 @@
<parent>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-all</artifactId>
<version>1.0.0-RC1</version>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>nacos-console</artifactId>
<!--<packaging>war</packaging>-->
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Expand Up @@ -18,7 +18,7 @@
<parent>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-all</artifactId>
<version>1.0.0-RC1</version>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion distribution/pom.xml
Expand Up @@ -18,7 +18,7 @@
<parent>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-all</artifactId>
<version>1.0.0-RC1</version>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion example/pom.xml
Expand Up @@ -18,7 +18,7 @@
<parent>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-all</artifactId>
<version>1.0.0-RC1</version>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion naming/pom.xml
Expand Up @@ -18,7 +18,7 @@
<parent>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-all</artifactId>
<version>1.0.0-RC1</version>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Expand Up @@ -102,6 +102,8 @@ public Thread newThread(Runnable r) {

private volatile Map<String, CopyOnWriteArrayList<RecordListener>> listeners = new ConcurrentHashMap<>();

private volatile Map<String, String> syncChecksumTasks = new ConcurrentHashMap<>(16);

@PostConstruct
public void init() throws Exception {
GlobalExecutor.submit(new Runnable() {
Expand Down Expand Up @@ -190,6 +192,14 @@ public void onRemove(String key) {

public void onReceiveChecksums(Map<String, String> checksumMap, String server) {

if (syncChecksumTasks.containsKey(server)) {
// Already in process of this server:
Loggers.EPHEMERAL.warn("sync checksum task already in process with {}", server);
return;
}

syncChecksumTasks.put(server, "1");

List<String> toUpdateKeys = new ArrayList<>();
List<String> toRemoveKeys = new ArrayList<>();
for (Map.Entry<String, String> entry : checksumMap.entrySet()) {
Expand Down Expand Up @@ -234,6 +244,8 @@ public void onReceiveChecksums(Map<String, String> checksumMap, String server) {
Loggers.EPHEMERAL.error("get data from " + server + " failed!", e);
}

// Remove this 'in process' flag:
syncChecksumTasks.remove(server);
}

public boolean syncAllDataFromRemote(Server server) {
Expand Down Expand Up @@ -261,6 +273,7 @@ public void processData(byte[] data) throws Exception {
// pretty sure the service not exist:
if (ServerMode.AP.name().equals(switchDomain.getServerMode())) {
// create empty service
Loggers.EPHEMERAL.info("creating service {}", entry.getKey());
Service service = new Service();
String serviceName = KeyBuilder.getServiceName(entry.getKey());
String namespaceId = KeyBuilder.getNamespace(entry.getKey());
Expand All @@ -277,19 +290,24 @@ public void processData(byte[] data) throws Exception {
}

for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
dataStore.put(entry.getKey(), entry.getValue());

if (!listeners.containsKey(entry.getKey())) {
Loggers.EPHEMERAL.warn("listener not found: {}", entry.getKey());
// Should not happen:
Loggers.EPHEMERAL.warn("listener of {} not found.", entry.getKey());
continue;
}
for (RecordListener listener : listeners.get(entry.getKey())) {
try {

try {
for (RecordListener listener : listeners.get(entry.getKey())) {
listener.onChange(entry.getKey(), entry.getValue().value);
} catch (Exception e) {
Loggers.EPHEMERAL.error("notify " + listener + ", key: " + entry.getKey() + " failed.", e);
}
} catch (Exception e) {
Loggers.EPHEMERAL.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e);
continue;
}

// Update data store if listener executed successfully:
dataStore.put(entry.getKey(), entry.getValue());
}
}
}
Expand Down Expand Up @@ -384,7 +402,7 @@ public void run() {
continue;
}
} catch (Throwable e) {
Loggers.EPHEMERAL.error("[NACOS-DISTRO] error while notifying listener of key: {} {}", datumKey, e);
Loggers.EPHEMERAL.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
}
}

Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package com.alibaba.nacos.naming.controllers;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.nacos.core.utils.WebUtils;
import com.alibaba.nacos.naming.cluster.ServerMode;
Expand Down Expand Up @@ -106,7 +107,9 @@ public String syncChecksum(HttpServletRequest request, HttpServletResponse respo

@RequestMapping(value = "/datum", method = RequestMethod.GET)
public void get(HttpServletRequest request, HttpServletResponse response) throws Exception {
String keys = WebUtils.required(request, "keys");

String entity = IOUtils.toString(request.getInputStream(), "UTF-8");
String keys = JSON.parseObject(entity).getString("keys");
String keySplitter = ",";
Map<String, Datum> datumMap = new HashMap<>(64);
for (String key : keys.split(keySplitter)) {
Expand Down
17 changes: 11 additions & 6 deletions naming/src/main/java/com/alibaba/nacos/naming/core/Service.java
Expand Up @@ -18,8 +18,8 @@
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.annotation.JSONField;
import com.alibaba.nacos.naming.boot.SpringContext;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.healthcheck.ClientBeatCheckTask;
import com.alibaba.nacos.naming.healthcheck.ClientBeatProcessor;
import com.alibaba.nacos.naming.healthcheck.HealthCheckReactor;
Expand Down Expand Up @@ -156,14 +156,19 @@ public void onChange(String key, Instances value) throws Exception {

Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);

for (Instance ip : value.getInstanceList()) {
for (Instance instance : value.getInstanceList()) {

if (instance == null) {
// Reject this abnormal instance list:
throw new RuntimeException("got null instance " + key);
}

if (ip.getWeight() > 10000.0D) {
ip.setWeight(10000.0D);
if (instance.getWeight() > 10000.0D) {
instance.setWeight(10000.0D);
}

if (ip.getWeight() < 0.01D && ip.getWeight() > 0.0D) {
ip.setWeight(0.01D);
if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
instance.setWeight(0.01D);
}
}

Expand Down
Expand Up @@ -79,6 +79,8 @@ public class ServiceManager implements RecordListener<Service> {
@Autowired
private PushService pushService;

private final Object putServiceLock = new Object();

@PostConstruct
public void init() {

Expand Down Expand Up @@ -330,6 +332,7 @@ public void addOrReplaceService(Service service) throws Exception {
public void createEmptyService(String namespaceId, String serviceName) throws NacosException {
Service service = getService(namespaceId, serviceName);
if (service == null) {
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
Expand Down Expand Up @@ -521,7 +524,11 @@ public boolean containService(String namespaceId, String serviceName) {

public void putService(Service service) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
serviceMap.put(service.getNamespaceId(), new ConcurrentHashMap<>(16));
synchronized (putServiceLock) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
serviceMap.put(service.getNamespaceId(), new ConcurrentHashMap<>(16));
}
}
}
serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
}
Expand Down
42 changes: 42 additions & 0 deletions naming/src/main/java/com/alibaba/nacos/naming/misc/HttpClient.java
Expand Up @@ -27,6 +27,8 @@
import org.apache.http.*;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ContentType;
Expand All @@ -41,6 +43,7 @@
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
import java.net.URLEncoder;
import java.util.*;
Expand Down Expand Up @@ -313,6 +316,35 @@ public static HttpResult httpPutLarge(String url, Map<String, String> headers, b
}
}

public static HttpResult httpGetLarge(String url, Map<String, String> headers, String content) {

try {
HttpClientBuilder builder = HttpClients.custom();
builder.setUserAgent(UtilsAndCommons.SERVER_VERSION);
builder.setConnectionTimeToLive(500, TimeUnit.MILLISECONDS);

CloseableHttpClient httpClient = builder.build();
HttpGetWithEntity httpGetWithEntity = new HttpGetWithEntity();
httpGetWithEntity.setURI(new URI(url));

for (Map.Entry<String, String> entry : headers.entrySet()) {
httpGetWithEntity.setHeader(entry.getKey(), entry.getValue());
}

httpGetWithEntity.setEntity(new StringEntity(content, ContentType.create("application/json", "UTF-8")));
HttpResponse response = httpClient.execute(httpGetWithEntity);
HttpEntity entity = response.getEntity();

HeaderElement[] headerElements = entity.getContentType().getElements();
String charset = headerElements[0].getParameterByName("charset").getValue();

return new HttpResult(response.getStatusLine().getStatusCode(),
IOUtils.toString(entity.getContent(), charset), Collections.<String, String>emptyMap());
} catch (Exception e) {
return new HttpResult(500, e.toString(), Collections.<String, String>emptyMap());
}
}

public static HttpResult httpPostLarge(String url, Map<String, String> headers, String content) {
try {
HttpClientBuilder builder = HttpClients.custom();
Expand Down Expand Up @@ -442,4 +474,14 @@ public String getHeader(String name) {
return respHeaders.get(name);
}
}

public static class HttpGetWithEntity extends HttpEntityEnclosingRequestBase {

public final static String METHOD_NAME = "GET";

@Override
public String getMethod() {
return METHOD_NAME;
}
}
}
Expand Up @@ -79,8 +79,8 @@ public static byte[] getData(List<String> keys, String server) throws Exception

Map<String, String> params = new HashMap<>(8);
params.put("keys", StringUtils.join(keys, ","));
HttpClient.HttpResult result = HttpClient.httpGet("http://" + server + RunningConfig.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_GET_URL, new ArrayList<>(), params);
HttpClient.HttpResult result = HttpClient.httpGetLarge("http://" + server + RunningConfig.getContextPath()
+ UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_GET_URL, new HashMap<>(8), JSON.toJSONString(params));

if (HttpURLConnection.HTTP_OK == result.code) {
return result.content.getBytes();
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -21,7 +21,7 @@
<inceptionYear>2018</inceptionYear>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-all</artifactId>
<version>1.0.0-RC1</version>
<version>1.0.0-SNAPSHOT</version>
<packaging>pom</packaging>

<name>Alibaba NACOS ${project.version}</name>
Expand Down
2 changes: 1 addition & 1 deletion test/pom.xml
Expand Up @@ -17,7 +17,7 @@
<parent>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-all</artifactId>
<version>1.0.0-RC1</version>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down

0 comments on commit 52d4e01

Please sign in to comment.