Skip to content

Commit

Permalink
#502 Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
nkorange committed Mar 19, 2019
1 parent 52d4e01 commit f6ea504
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 40 deletions.
Expand Up @@ -21,9 +21,9 @@
import com.alibaba.fastjson.TypeReference;
import com.alibaba.nacos.naming.boot.RunningConfig;
import com.alibaba.nacos.naming.consistency.ApplyAction;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.core.Instances;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.misc.*;
Expand Down Expand Up @@ -312,7 +312,7 @@ public void onPublish(Datum datum, RaftPeer source) throws Exception {
}
raftStore.updateTerm(local.term.get());

notifier.addTask(datum, ApplyAction.CHANGE);
notifier.addTask(datum.key, ApplyAction.CHANGE);

Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
}
Expand Down Expand Up @@ -699,7 +699,7 @@ public Integer onCompleted(Response response) throws Exception {
}

datums.put(datum.key, datum);
notifier.addTask(datum, ApplyAction.CHANGE);
notifier.addTask(datum.key, ApplyAction.CHANGE);

local.resetLeaderDue();

Expand Down Expand Up @@ -844,7 +844,7 @@ public int datumSize() {

public void addDatum(Datum datum) {
datums.put(datum.key, datum);
notifier.addTask(datum, ApplyAction.CHANGE);
notifier.addTask(datum.key, ApplyAction.CHANGE);
}

public void loadDatum(String key) {
Expand All @@ -861,19 +861,17 @@ public void loadDatum(String key) {
}

private void deleteDatum(String key) {

Datum deleted = null;
Datum deleted;
try {
deleted = datums.remove(URLDecoder.decode(key, "UTF-8"));
if (deleted != null) {
raftStore.delete(deleted);
Loggers.RAFT.info("datum deleted, key: {}", key);
}
notifier.addTask(URLDecoder.decode(key, "UTF-8"), ApplyAction.DELETE);
} catch (UnsupportedEncodingException e) {
Loggers.RAFT.warn("datum key decode failed: {}", key);
}
// FIXME should we ignore the value of 'deleted'?
if (deleted != null) {
raftStore.delete(deleted);
notifier.addTask(deleted, ApplyAction.DELETE);
Loggers.RAFT.info("datum deleted, key: {}", key);
}
}

public boolean isInitialized() {
Expand All @@ -886,15 +884,15 @@ public class Notifier implements Runnable {

private BlockingQueue<Pair> tasks = new LinkedBlockingQueue<Pair>(1024 * 1024);

public void addTask(Datum datum, ApplyAction action) {
public void addTask(String datumKey, ApplyAction action) {

if (services.containsKey(datum.key) && action == ApplyAction.CHANGE) {
if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {
return;
}
if (action == ApplyAction.CHANGE) {
services.put(datum.key, StringUtils.EMPTY);
services.put(datumKey, StringUtils.EMPTY);
}
tasks.add(Pair.with(datum, action));
tasks.add(Pair.with(datumKey, action));
}

public int getTaskSize() {
Expand All @@ -914,58 +912,58 @@ public void run() {
continue;
}

Datum datum = (Datum) pair.getValue0();
String datumKey = (String) pair.getValue0();
ApplyAction action = (ApplyAction) pair.getValue1();

services.remove(datum.key);
services.remove(datumKey);

int count = 0;

if (listeners.containsKey(KeyBuilder.SERVICE_META_KEY_PREFIX)) {

if (KeyBuilder.matchServiceMetaKey(datum.key) && !KeyBuilder.matchSwitchKey(datum.key)) {
if (KeyBuilder.matchServiceMetaKey(datumKey) && !KeyBuilder.matchSwitchKey(datumKey)) {

for (RecordListener listener : listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX)) {
try {
if (action == ApplyAction.CHANGE) {
listener.onChange(datum.key, getDatum(datum.key).value);
listener.onChange(datumKey, getDatum(datumKey).value);
}

if (action == ApplyAction.DELETE) {
listener.onDelete(datum.key);
listener.onDelete(datumKey);
}
} catch (Throwable e) {
Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {} {}", datum.key, e);
Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {} {}", datumKey, e);
}
}
}
}

if (!listeners.containsKey(datum.key)) {
if (!listeners.containsKey(datumKey)) {
continue;
}

for (RecordListener listener : listeners.get(datum.key)) {
for (RecordListener listener : listeners.get(datumKey)) {

count++;

try {
if (action == ApplyAction.CHANGE) {
listener.onChange(datum.key, getDatum(datum.key).value);
listener.onChange(datumKey, getDatum(datumKey).value);
continue;
}

if (action == ApplyAction.DELETE) {
listener.onDelete(datum.key);
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {} {}", datum.key, e);
Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {} {}", datumKey, e);
}
}

if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("[NACOS-RAFT] datum change notified, key: {}, listener count: {}", datum.key, count);
Loggers.RAFT.debug("[NACOS-RAFT] datum change notified, key: {}, listener count: {}", datumKey, count);
}
} catch (Throwable e) {
Loggers.RAFT.error("[NACOS-RAFT] Error while handling notifying task", e);
Expand Down
Expand Up @@ -63,7 +63,7 @@ public synchronized ConcurrentHashMap<String, Datum> loadDatums(RaftCore.Notifie
datum = readDatum(datumFile, cache.getName());
if (datum != null) {
datums.put(datum.key, datum);
notifier.addTask(datum, ApplyAction.CHANGE);
notifier.addTask(datum.key, ApplyAction.CHANGE);
}
}
continue;
Expand Down
Expand Up @@ -107,15 +107,6 @@ public String remove(HttpServletRequest request) throws Exception {
Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);

Service service = serviceManager.getService(namespaceId, serviceName);
if (service == null) {
throw new IllegalArgumentException("specified service not exist, serviceName : " + serviceName);
}

if (!service.allIPs().isEmpty()) {
throw new IllegalArgumentException("specified service has instances, serviceName : " + serviceName);
}

serviceManager.easyRemoveService(namespaceId, serviceName);

return "ok";
Expand Down
Expand Up @@ -322,6 +322,16 @@ public int getResponsibleInstanceCount() {
}

public void easyRemoveService(String namespaceId, String serviceName) throws Exception {

Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new IllegalArgumentException("specified service not exist, serviceName : " + serviceName);
}

if (!service.allIPs().isEmpty()) {
throw new IllegalArgumentException("specified service has instances, serviceName : " + serviceName);
}

consistencyService.remove(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName));
}

Expand Down
Expand Up @@ -70,8 +70,8 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha
try {
String path = new URI(req.getRequestURI()).getPath();
if (ServerMode.AP.name().equals(switchDomain.getServerMode()) && !HttpMethod.GET.equals(req.getMethod())) {
if (path.startsWith(UtilsAndCommons.NACOS_NAMING_CONTEXT + UtilsAndCommons.NACOS_NAMING_SERVICE_CONTEXT)
|| path.startsWith(UtilsAndCommons.NACOS_NAMING_CONTEXT + UtilsAndCommons.NACOS_NAMING_CLUSTER_CONTEXT)) {
if (path.contains(UtilsAndCommons.NACOS_NAMING_CONTEXT + UtilsAndCommons.NACOS_NAMING_SERVICE_CONTEXT)
|| path.contains(UtilsAndCommons.NACOS_NAMING_CONTEXT + UtilsAndCommons.NACOS_NAMING_CLUSTER_CONTEXT)) {
resp.getWriter().write("server in AP mode, request: " + req.getMethod() + " " + path + " not permitted");
resp.setStatus(HttpServletResponse.SC_FORBIDDEN);
return;
Expand Down

0 comments on commit f6ea504

Please sign in to comment.