Skip to content

Commit

Permalink
#502 Partition Protocol develop finished
Browse files Browse the repository at this point in the history
  • Loading branch information
nkorange committed Jan 22, 2019
1 parent cc9b6b5 commit bc4d4d3
Show file tree
Hide file tree
Showing 14 changed files with 203 additions and 59 deletions.
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.ephemeral.partition;
package com.alibaba.nacos.naming.cluster.transport;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
Expand Down
Expand Up @@ -13,12 +13,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.ephemeral.partition;
package com.alibaba.nacos.naming.cluster.transport;

import java.util.Map;

/**
* Serializer for data
* Serializer specially for large map of data
*
* @author nkorange
* @since 1.0.0
Expand Down
Expand Up @@ -51,8 +51,7 @@ public interface DataListener {
* Action to do if data of target key has been removed
*
* @param key target key
* @param value data of the key
* @throws Exception
*/
void onDelete(String key, Object value) throws Exception;
void onDelete(String key) throws Exception;
}
Expand Up @@ -18,6 +18,16 @@
import com.alibaba.nacos.naming.consistency.ConsistencyService;

/**
* A type of consistency for ephemeral data.
* <p>
* This kind of consistency is not required to store data on disk or database, because the
* ephemeral data always keeps a session with server and as long as the session still lives
* the ephemeral data won't be lost.
* <p>
* What is required is that writing should always be successful even if network partition
* happens. And when the network recovers, data of each partition is merged into one set,
* so the cluster resumes to a consistent status.
*
* @author nkorange
* @since 1.0.0
*/
Expand Down
Expand Up @@ -15,14 +15,10 @@
*/
package com.alibaba.nacos.naming.consistency.ephemeral.partition;

import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.naming.consistency.Datum;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

/**
Expand All @@ -40,8 +36,8 @@ public void put(String key, Datum value) {
dataMap.put(key, value);
}

public void remove(String key) {
dataMap.remove(key);
public Datum remove(String key) {
return dataMap.remove(key);
}

public Set<String> keys() {
Expand All @@ -52,6 +48,10 @@ public Datum get(String key) {
return dataMap.get(key);
}

public boolean contains(String key) {
return dataMap.containsKey(key);
}

public Map<String, Datum> batchGet(List<String> keys) {
Map<String, Datum> map = new HashMap<>();
for (String key : keys) {
Expand Down
Expand Up @@ -15,10 +15,10 @@
*/
package com.alibaba.nacos.naming.consistency.ephemeral.partition;

import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.naming.cluster.ServerListManager;
import com.alibaba.nacos.naming.cluster.members.Member;
import com.alibaba.nacos.naming.cluster.members.MemberChangeListener;
import com.alibaba.nacos.naming.cluster.transport.Serializer;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
Expand Down Expand Up @@ -132,9 +132,9 @@ public void run() {
keyTimestamps.put(key, dataStore.get(key).timestamp.get());
}

// TODO
// for (Member member : )

for (Member member : servers) {
NamingProxy.syncTimestamps(keyTimestamps, member.getKey());
}
}
}

Expand Down
Expand Up @@ -17,12 +17,14 @@

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.naming.cluster.transport.Serializer;
import com.alibaba.nacos.naming.consistency.DataListener;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.ephemeral.EphemeralConsistencyService;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.NamingProxy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

Expand All @@ -36,8 +38,11 @@
* <p>
* Use a partition algorithm to divide data into many blocks. Each Nacos server node takes
* responsibility for exactly one block of data. Each block of data is generated, removed
* and synchronized by its associated server. So every Nacos only handles writings for a
* subset of the total service data, and at mean time stores complete service data.
* and synchronized by its responsible server. So every Nacos server only handles writings
* for a subset of the total service data.
* <p>
* At mean time every Nacos server receives data sync of other Nacos server, so every Nacos
* server will eventually have a complete set of data.
*
* @author nkorange
* @since 1.0.0
Expand All @@ -54,25 +59,20 @@ public class PartitionConsistencyServiceImpl implements EphemeralConsistencyServ
@Autowired
private TaskDispatcher taskDispatcher;

@Autowired
private Serializer serializer;

private volatile Map<String, List<DataListener>> listeners = new ConcurrentHashMap<>();

@Override
public void put(String key, Object value) throws NacosException {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
List<Instance> instances = (List<Instance>) value;
Datum<List<Instance>> datum = new Datum<>();
datum.value = instances;
datum.key = key;
datum.timestamp.set(System.currentTimeMillis());
dataStore.put(key, datum);
}
taskDispatcher.addTask(key);
onPut(key, value);
taskDispatcher.addTask(key);
}

@Override
public void remove(String key) throws NacosException {
dataStore.remove(key);
onRemove(key);
}

@Override
Expand All @@ -81,6 +81,16 @@ public Object get(String key) throws NacosException {
}

public void onPut(String key, Object value) {

if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
List<Instance> instances = (List<Instance>) value;
Datum<List<Instance>> datum = new Datum<>();
datum.value = instances;
datum.key = key;
datum.timestamp.set(System.currentTimeMillis());
dataStore.put(key, datum);
}

if (!listeners.containsKey(key)) {
return;
}
Expand All @@ -93,6 +103,63 @@ public void onPut(String key, Object value) {
}
}

public void onRemove(String key) {

dataStore.remove(key);

if (!listeners.containsKey(key)) {
return;
}
for (DataListener listener : listeners.get(key)) {
try {
listener.onDelete(key);
} catch (Exception e) {
Loggers.EPHEMERAL.error("notify " + listener + ", key:" + key + " failed.", e);
}
}
}

public void onReceiveTimestamps(Map<String, Long> timestamps, String server) {

List<String> toUpdateKeys = new ArrayList<>();
List<String> toRemoveKeys = new ArrayList<>();
for (Map.Entry<String, Long> entry : timestamps.entrySet()) {
if (isResponsible(entry.getKey())) {
// this key should not be sent from remote server:
Loggers.EPHEMERAL.error("receive timestamp of " + entry.getKey() + " from " + server);
continue;
}
if (!dataStore.contains(entry.getKey()) || dataStore.get(entry.getKey()).timestamp.get() < entry.getValue()) {
toUpdateKeys.add(entry.getKey());
}
}

for (String key : dataStore.keys()) {
if (!timestamps.containsKey(key)) {
toRemoveKeys.add(key);
}
}

Loggers.EPHEMERAL.info("to remove keys:" + toRemoveKeys);

for (String key : toRemoveKeys) {
onRemove(key);
}

try {
byte[] result = NamingProxy.getData(toUpdateKeys, server);
if (result.length > 0) {
Map<String, Datum> datumMap = serializer.deserialize(result, Datum.class);
for (Map.Entry<String, Datum> entry : datumMap.entrySet()) {
dataStore.put(entry.getKey(), entry.getValue());
}
}
} catch (Exception e) {
Loggers.EPHEMERAL.error("get data from " + server + " failed!", e);
}

}

@Override
public void listen(String key, DataListener listener) throws NacosException {
if (!listeners.containsKey(key)) {
Expand Down
Expand Up @@ -752,7 +752,6 @@ public Integer onCompleted(Response response) throws Exception {

}


return local;
}

Expand Down Expand Up @@ -917,7 +916,7 @@ public void run() {
}

if (action == ApplyAction.DELETE) {
listener.onDelete(datum.key, datum.value);
listener.onDelete(datum.key);
}
} catch (Throwable e) {
Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {} {}", datum.key, e);
Expand All @@ -944,7 +943,7 @@ public void run() {
}

if (action == ApplyAction.DELETE) {
listener.onDelete(datum.key, datum.value);
listener.onDelete(datum.key);
continue;
}
} catch (Throwable e) {
Expand Down
Expand Up @@ -2,16 +2,19 @@

import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.common.util.IoUtils;
import com.alibaba.nacos.core.utils.WebUtils;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.ephemeral.partition.PartitionConsistencyServiceImpl;
import com.alibaba.nacos.naming.consistency.ephemeral.partition.Serializer;
import com.alibaba.nacos.naming.cluster.transport.Serializer;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -43,4 +46,23 @@ public String onSync(HttpServletRequest request, HttpServletResponse response) t
}
return "ok";
}

@RequestMapping("/syncTimestamps")
public String syncTimestamps(HttpServletRequest request, HttpServletResponse response) throws Exception {
String source = WebUtils.required(request, "source");
byte[] data = IoUtils.tryDecompress(request.getInputStream());
Map<String, Long> dataMap = serializer.deserialize(data, Long.class);
consistencyService.onReceiveTimestamps(dataMap, source);
return "ok";
}

@RequestMapping("/get")
public void get(HttpServletRequest request, HttpServletResponse response) throws Exception {
String keys = WebUtils.required(request, "keys");
Map<String, Datum> datumMap = new HashMap<>();
for (String key : keys.split(",")) {
datumMap.put(key, (Datum) consistencyService.get(key));
}
response.getWriter().write(new String(serializer.serialize(datumMap), "UTF-8"));
}
}
Expand Up @@ -178,12 +178,12 @@ public void onChange(String key, Object value) throws Exception {
}

@Override
public void onDelete(String key, Object value) throws Exception {
public void onDelete(String key) throws Exception {
String domKey = StringUtils.removeStart(key, UtilsAndCommons.DOMAINS_DATA_ID_PRE);
String namespace = domKey.split(UtilsAndCommons.SERVICE_GROUP_CONNECTOR)[0];
String name = domKey.split(UtilsAndCommons.SERVICE_GROUP_CONNECTOR)[1];
VirtualClusterDomain dom = chooseDomMap(namespace).remove(name);
Loggers.RAFT.info("[RAFT-NOTIFIER] datum is deleted, key: {}, value: {}", key, value);
Loggers.RAFT.info("[RAFT-NOTIFIER] datum is deleted, key: {}", key);

if (dom != null) {
dom.destroy();
Expand Down
Expand Up @@ -197,7 +197,7 @@ public void onChange(String key, Object value) throws Exception {
}

@Override
public void onDelete(String key, Object value) throws Exception {
public void onDelete(String key) throws Exception {
// ignore
}

Expand Down
23 changes: 23 additions & 0 deletions naming/src/main/java/com/alibaba/nacos/naming/misc/HttpClient.java
Expand Up @@ -280,6 +280,29 @@ public static HttpResult httpPost(String url, List<String> headers, Map<String,
}
}

public static void asyncHttpPutLarge(String url, Map<String, String> headers, byte[] content, AsyncCompletionHandler handler) throws Exception {
AsyncHttpClient.BoundRequestBuilder builder = asyncHttpClient.preparePut(url);

if (!headers.isEmpty()) {
for (String headerKey : headers.keySet()) {
builder.setHeader(headerKey, headers.get(headerKey));
}
}

builder.setBody(content);

builder.setHeader("Content-Type", "application/json; charset=UTF-8");
builder.setHeader("Accept-Charset", "UTF-8");
builder.setHeader("Accept-Encoding", "gzip");
builder.setHeader("Content-Encoding", "gzip");

if (handler != null) {
builder.execute(handler);
} else {
builder.execute();
}
}

public static HttpResult httpPutLarge(String url, Map<String, String> headers, byte[] content) {
try {
HttpClientBuilder builder = HttpClients.custom();
Expand Down

0 comments on commit bc4d4d3

Please sign in to comment.