Skip to content

Commit

Permalink
#744 Add server status, #629 Add partition service warm up logic
Browse files Browse the repository at this point in the history
  • Loading branch information
nkorange committed Feb 9, 2019
1 parent cba85f0 commit 58b8d16
Show file tree
Hide file tree
Showing 38 changed files with 607 additions and 290 deletions.
2 changes: 0 additions & 2 deletions api/src/main/java/com/alibaba/nacos/api/common/Constants.java
Expand Up @@ -22,8 +22,6 @@
*/
public class Constants {

public static final String CLIENT_VERSION_HEADER = "Client-Version";

public static final String CLIENT_VERSION = "3.0.0";

public static int DATA_IN_BODY_VERSION = 204;
Expand Down
Expand Up @@ -101,6 +101,7 @@ public List<String> getServerListFromEndpoint() {
String urlString = "http://" + endpoint + "/nacos/serverlist";

List<String> headers = Arrays.asList("Client-Version", UtilAndComs.VERSION,
"User-Agent", UtilAndComs.VERSION,
"Accept-Encoding", "gzip,deflate,sdch",
"Connection", "Keep-Alive",
"RequestId", UuidUtils.generateUuid());
Expand Down Expand Up @@ -301,6 +302,7 @@ public String callServer(String api, Map<String, String> params, String curServe
long end = 0;

List<String> headers = Arrays.asList("Client-Version", UtilAndComs.VERSION,
"User-Agent", UtilAndComs.VERSION,
"Accept-Encoding", "gzip,deflate,sdch",
"Connection", "Keep-Alive",
"RequestId", UuidUtils.generateUuid());
Expand Down
Expand Up @@ -56,6 +56,11 @@ public void doRaftAuth(HttpServletRequest req) throws Exception {
return;
}

agent = req.getHeader("User-Agent");
if (StringUtils.startsWith(agent, UtilsAndCommons.NACOS_SERVER_HEADER)) {
return;
}

throw new IllegalAccessException("illegal access,agent= " + agent + ", token=" + token);
}

Expand Down
@@ -0,0 +1,49 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.cluster;

/**
* A flag to indicate the exact status of a server.
*
* @author nkorange
* @since 1.0.0
*/
public enum ServerStatus {
/**
* server is up and ready for request
*/
UP,
/**
* server is out of service, something abnormal happened
*/
DOWN,
/**
* server is preparing itself for request, usually 'UP' is the next status
*/
STARTING,
/**
* server is manually paused
*/
PAUSED,
/**
* only write operation is permitted.
*/
WRITE_ONLY,
/**
* only read operation is permitted.
*/
READY_ONLY
}
@@ -0,0 +1,76 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.cluster;

import com.alibaba.nacos.naming.consistency.ConsistencyService;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;

/**
* Detect and control the working status of local server
*
* @author nkorange
* @since 1.0.0
*/
@Service
public class ServerStatusManager {

@Autowired
private ConsistencyService consistencyService;

@Autowired
private SwitchDomain switchDomain;

private ServerStatus serverStatus = ServerStatus.STARTING;

private boolean serverStatusLocked = false;

@PostConstruct
public void init() {
GlobalExecutor.registerServerStatusUpdater(new ServerStatusUpdater());
}

private void refreshServerStatus() {

if (StringUtils.isNotBlank(switchDomain.getOverriddenServerStatus())) {
serverStatus = ServerStatus.valueOf(switchDomain.getOverriddenServerStatus());
return;
}

if (consistencyService.isAvailable()) {
serverStatus = ServerStatus.UP;
} else {
serverStatus = ServerStatus.DOWN;
}
}

public ServerStatus getServerStatus() {
return serverStatus;
}

public class ServerStatusUpdater implements Runnable {

@Override
public void run() {
refreshServerStatus();
}
}
}
Expand Up @@ -93,4 +93,11 @@ public interface ConsistencyService {
* @return responsible server for the data
*/
String getResponsibleServer(String key);

/**
* Tell the status of this consistency service
*
* @return true if available
*/
boolean isAvailable();
}
Expand Up @@ -20,15 +20,15 @@
import com.alibaba.nacos.naming.consistency.persistent.PersistentConsistencyService;
import com.alibaba.nacos.naming.core.DistroMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

/**
* Publish execution delegate
*
* @author nkorange
* @since 1.0.0
*/
@Component("consistencyDelegate")
@Service("consistencyDelegate")
public class DelegateConsistencyServiceImpl implements ConsistencyService {

@Autowired
Expand Down Expand Up @@ -94,4 +94,9 @@ public boolean isResponsible(String key) {
public String getResponsibleServer(String key) {
return distroMapper.mapSrv(KeyBuilder.getServiceName(key));
}

@Override
public boolean isAvailable() {
return ephemeralConsistencyService.isAvailable() && persistentConsistencyService.isAvailable();
}
}
Expand Up @@ -15,6 +15,7 @@
*/
package com.alibaba.nacos.naming.consistency.ephemeral.partition;

import com.alibaba.nacos.common.util.IoUtils;
import com.alibaba.nacos.naming.cluster.ServerListManager;
import com.alibaba.nacos.naming.cluster.servers.Server;
import com.alibaba.nacos.naming.cluster.servers.ServerChangeListener;
Expand All @@ -29,12 +30,17 @@
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.commons.lang3.CharEncoding.UTF_8;

/**
* Data replicator
*
Expand All @@ -48,6 +54,9 @@ public class DataSyncer implements ServerChangeListener {
@Autowired
private DataStore dataStore;

@Autowired
private PartitionConfig partitionConfig;

@Autowired
private Serializer serializer;

Expand All @@ -61,6 +70,8 @@ public class DataSyncer implements ServerChangeListener {

private List<Server> servers;

private boolean initialized = false;

@PostConstruct
public void init() {
serverListManager.listen(this);
Expand Down Expand Up @@ -144,23 +155,51 @@ public class TimedSync implements Runnable {
@Override
public void run() {

Map<String, Long> keyTimestamps = new HashMap<>(64);
for (String key : dataStore.keys()) {
if (!distroMapper.responsible(KeyBuilder.getServiceName(key))) {
continue;
try {

File metaFile = new File(UtilsAndCommons.DATA_BASE_DIR + File.separator + "ephemeral.properties");
if (initialized) {
// write the current instance count to disk:
IoUtils.writeStringToFile(metaFile, "instanceCount=" + dataStore.keys().size(), "UTF-8");
} else {
// check if most of the data are loaded:
List<String> lines = IoUtils.readLines(new InputStreamReader(new FileInputStream(metaFile), UTF_8));
if (lines == null || lines.isEmpty()) {
initialized = true;
} else {
int desiredInstanceCount = Integer.parseInt(lines.get(0).split("=")[1]);
if (desiredInstanceCount * partitionConfig.getInitDataRatio() < dataStore.keys().size()) {
initialized = true;
}
}
}
keyTimestamps.put(key, dataStore.get(key).timestamp.get());
}

if (keyTimestamps.isEmpty()) {
return;
} catch (Exception e) {
Loggers.EPHEMERAL.error("operate on meta file failed.", e);
}

for (Server member : servers) {
if (NetUtils.localServer().equals(member.getKey())) {
continue;
try {
// send local timestamps to other servers:
Map<String, Long> keyTimestamps = new HashMap<>(64);
for (String key : dataStore.keys()) {
if (!distroMapper.responsible(KeyBuilder.getServiceName(key))) {
continue;
}
keyTimestamps.put(key, dataStore.get(key).timestamp.get());
}

if (keyTimestamps.isEmpty()) {
return;
}

for (Server member : servers) {
if (NetUtils.localServer().equals(member.getKey())) {
continue;
}
NamingProxy.syncTimestamps(keyTimestamps, member.getKey());
}
NamingProxy.syncTimestamps(keyTimestamps, member.getKey());
} catch (Exception e) {
Loggers.EPHEMERAL.error("timed sync task failed.", e);
}
}
}
Expand All @@ -173,6 +212,10 @@ public String buildKey(String key, String targetServer) {
return key + UtilsAndCommons.CACHE_KEY_SPLITER + targetServer;
}

public boolean isInitialized() {
return initialized;
}

@Override
public void onChangeServerList(List<Server> latestMembers) {

Expand Down
Expand Up @@ -33,11 +33,18 @@ public class PartitionConfig {
@Value("${nacos.naming.partition.batchSyncKeyCount}")
private int batchSyncKeyCount = 1000;

@Value("${nacos.naming.partition.initDataRatio}")
private float initDataRatio = 0.9F;

public int getTaskDispatchPeriod() {
return taskDispatchPeriod;
}

public int getBatchSyncKeyCount() {
return batchSyncKeyCount;
}

public float getInitDataRatio() {
return initDataRatio;
}
}
Expand Up @@ -22,12 +22,11 @@
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.ephemeral.EphemeralConsistencyService;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.core.Instances;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.NamingProxy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -48,7 +47,7 @@
* @author nkorange
* @since 1.0.0
*/
@Component("partitionConsistencyService")
@Service("partitionConsistencyService")
public class PartitionConsistencyServiceImpl implements EphemeralConsistencyService {

@Autowired
Expand All @@ -60,6 +59,9 @@ public class PartitionConsistencyServiceImpl implements EphemeralConsistencyServ
@Autowired
private TaskDispatcher taskDispatcher;

@Autowired
private DataSyncer dataSyncer;

@Autowired
private Serializer serializer;

Expand Down Expand Up @@ -207,4 +209,9 @@ public boolean isResponsible(String key) {
public String getResponsibleServer(String key) {
return distroMapper.mapSrv(KeyBuilder.getServiceName(key));
}

@Override
public boolean isAvailable() {
return dataSyncer.isInitialized();
}
}

0 comments on commit 58b8d16

Please sign in to comment.