Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,22 +102,32 @@ public static void run() throws Exception {
ArchaiusUtils.setProperty("cse.loadbalance.strategy.name", "WeightedResponse");
testStringArray(test);
boolean checkerStated = false;
Set<Thread> allThreads = Thread.getAllStackTraces().keySet();
for (Thread t : allThreads) {
if (t.getName().equals("NFLoadBalancer-serverWeightTimer-unknown")) {
checkerStated = true;
// Timer may not start thread very fast so check for 3 times.
for (int i = 0; i < 3; i++) {
Set<Thread> allThreads = Thread.getAllStackTraces().keySet();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

java.lang.Thread#getThreads is cheaper

for (Thread t : allThreads) {
if (t.getName().equals("NFLoadBalancer-serverWeightTimer-unknown")) {
checkerStated = true;
break;
}
}
Thread.sleep(1000);
}
TestMgr.check(checkerStated, true);

ArchaiusUtils.setProperty("cse.loadbalance.strategy.name", "RoundRobin");
testStringArray(test);

allThreads = Thread.getAllStackTraces().keySet();

boolean checkerDestroyed = true;
for (Thread t : allThreads) {
if (t.getName().equals("NFLoadBalancer-serverWeightTimer-unknown")) {
checkerDestroyed = false;
// Timer cancel may not destroy thread very fast so check for 3 times.
for (int i = 0; i < 3; i++) {
checkerDestroyed = true;
Set<Thread> allThreads = Thread.getAllStackTraces().keySet();
for (Thread t : allThreads) {
if (t.getName().equals("NFLoadBalancer-serverWeightTimer-unknown")) {
checkerDestroyed = false;
Thread.sleep(1000);
}
}
}
TestMgr.check(checkerDestroyed, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public class ConfigCenterClient {

private static final long HEARTBEAT_INTERVAL = 30000;

private static final long BOOTUP_WAIT_TIME = 10;

private ScheduledExecutorService heartbeatTask = null;

private int refreshMode = CONFIG_CENTER_CONFIG.getRefreshMode();
Expand Down Expand Up @@ -129,7 +131,9 @@ public void connectServer() {
throw new IllegalStateException(e);
}
refreshMembers(memberDiscovery);
EXECUTOR.scheduleWithFixedDelay(new ConfigRefresh(parseConfigUtils, memberDiscovery),
ConfigRefresh refreshTask = new ConfigRefresh(parseConfigUtils, memberDiscovery);
refreshTask.run(true);
EXECUTOR.scheduleWithFixedDelay(refreshTask,
firstRefreshInterval,
refreshInterval,
TimeUnit.MILLISECONDS);
Expand All @@ -155,6 +159,9 @@ private void refreshMembers(MemberDiscovery memberDiscovery) {
request.headers().add("X-Auth-Token", ConfigCenterConfig.INSTANCE.getToken());
}
authHeaderProviders.forEach(provider -> request.headers().addAll(provider.getSignAuthHeaders(signReq)));
request.exceptionHandler(e -> {
LOGGER.error("Fetch member from {} failed. Error message is [{}].", configCenter, e.getMessage());
});
request.end();
});
}
Expand Down Expand Up @@ -211,25 +218,29 @@ class ConfigRefresh implements Runnable {
this.memberdis = memberdis;
}

// 具体动作
@Override
public void run() {
public void run(boolean wait) {
// this will be single threaded, so we don't care about concurrent
// staffs
try {
String configCenter = memberdis.getConfigServer();
if (refreshMode == 1) {
refreshConfig(configCenter);
refreshConfig(configCenter, wait);
} else if (!isWatching) {
// 重新监听时需要先加载,避免在断开期间丢失变更
refreshConfig(configCenter);
refreshConfig(configCenter, wait);
doWatch(configCenter);
}
} catch (Exception e) {
LOGGER.error("client refresh thread exception", e);
}
}

// 具体动作
@Override
public void run() {
run(false);
}

// create watch and wait for done
public void doWatch(String configCenter)
throws UnsupportedEncodingException, InterruptedException {
Expand Down Expand Up @@ -270,7 +281,7 @@ public void doWatch(String configCenter)
LOGGER.info("watching config recieved {}", action);
Map<String, Object> mAction = action.toJsonObject().getMap();
if ("CREATE".equals(mAction.get("action"))) {
refreshConfig(configCenter);
refreshConfig(configCenter, false);
} else if ("MEMBER_CHANGE".equals(mAction.get("action"))) {
refreshMembers(memberdis);
} else {
Expand All @@ -282,7 +293,10 @@ public void doWatch(String configCenter)
waiter.countDown();
},
e -> {
LOGGER.error("watcher connect to config center {} refresh port {} failed. Error message is [{}]", configCenter, refreshPort, e.getMessage());
LOGGER.error("watcher connect to config center {} refresh port {} failed. Error message is [{}]",
configCenter,
refreshPort,
e.getMessage());
waiter.countDown();
});
});
Expand Down Expand Up @@ -313,7 +327,8 @@ private void sendHeartbeat(WebSocket ws) {
}
}

public void refreshConfig(String configcenter) {
public void refreshConfig(String configcenter, boolean wait) {
CountDownLatch latch = new CountDownLatch(1);
clientMgr.findThreadBindClientPool().runOnContext(client -> {
String path = URIConst.ITEMS + "?dimensionsInfo=" + StringUtils.deleteWhitespace(serviceName);
IpPort ipPort = NetUtils.parseIpPortFromURI(configcenter);
Expand All @@ -330,10 +345,12 @@ public void refreshConfig(String configcenter) {
EventManager.post(new ConnFailEvent("config refresh result parse fail " + e.getMessage()));
LOGGER.error("Config refresh from {} failed. Error message is [{}].", configcenter, e.getMessage());
}
latch.countDown();
});
} else {
rsp.bodyHandler(buf -> {
LOGGER.error("Server error message is [{}].", buf);
latch.countDown();
});
EventManager.post(new ConnFailEvent("fetch config fail"));
LOGGER.error("Config refresh from {} failed.", configcenter);
Expand All @@ -354,9 +371,19 @@ public void refreshConfig(String configcenter) {
request.exceptionHandler(e -> {
EventManager.post(new ConnFailEvent("fetch config fail"));
LOGGER.error("Config refresh from {} failed. Error message is [{}].", configcenter, e.getMessage());
latch.countDown();
});
request.end();
});
if (wait) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar wait code here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two steps, so got similar code.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed refresh member wait logic. Because we can use the configured address and it's first time, no need to wait this operation.

LOGGER.info("Refreshing remote config...");
try {
latch.await(BOOTUP_WAIT_TIME, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOGGER.warn(e.getMessage());
}
LOGGER.info("Refreshing remote config is done.");
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public class MemberDiscovery {

private List<String> configServerAddresses = new ArrayList<>();

private Object lock = new Object();

private AtomicInteger counter = new AtomicInteger(0);

public MemberDiscovery(List<String> configCenterUri) {
Expand All @@ -52,11 +54,13 @@ public MemberDiscovery(List<String> configCenterUri) {
}

public String getConfigServer() {
if(configServerAddresses.isEmpty()) {
throw new IllegalStateException("Config center address is not available.");
synchronized (lock) {
if (configServerAddresses.isEmpty()) {
throw new IllegalStateException("Config center address is not available.");
}
int index = Math.abs(counter.get() % configServerAddresses.size());
return configServerAddresses.get(index);
}
int index = Math.abs(counter.get() % configServerAddresses.size());
return configServerAddresses.get(index);
}

@Subscribe
Expand All @@ -65,17 +69,22 @@ public void onConnFailEvent(ConnFailEvent e) {
}

public void refreshMembers(JsonObject members) {
configServerAddresses.clear();
List<String> newServerAddresses = new ArrayList<>();
members.getJsonArray("instances").forEach(m -> {
JsonObject instance = (JsonObject) m;
if ("UP".equals(instance.getString("status", "UP"))) {
String endpoint = instance.getJsonArray("endpoints").getString(0);
String scheme = instance.getBoolean("isHttps", false) ? "https" : "http";
configServerAddresses.add(scheme + SCHEMA_SEPRATOR
newServerAddresses.add(scheme + SCHEMA_SEPRATOR
+ endpoint.substring(endpoint.indexOf(SCHEMA_SEPRATOR) + SCHEMA_SEPRATOR.length()));
}
});
Collections.shuffle(configServerAddresses);
LOGGER.info("config center members: {}", configServerAddresses);

synchronized (lock) {
this.configServerAddresses.clear();
this.configServerAddresses.addAll(newServerAddresses);
Collections.shuffle(this.configServerAddresses);
}
LOGGER.info("New config center members: {}", this.configServerAddresses);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ private static void duplicateServiceCombConfigToCse(ConcurrentCompositeConfigura
compositeConfiguration.addConfiguration(source, sourceName);
}

public static DynamicWatchedConfiguration createConfigFromConfigCenter(Configuration localConfiguration) {
private static ConfigCenterConfigurationSource createConfigCenterConfigurationSource(
Configuration localConfiguration) {
ConfigCenterConfigurationSource configCenterConfigurationSource =
SPIServiceUtils.getTargetService(ConfigCenterConfigurationSource.class);
if (null == configCenterConfigurationSource) {
Expand All @@ -194,26 +195,30 @@ public static DynamicWatchedConfiguration createConfigFromConfigCenter(Configura
LOGGER.info("Config Source serverUri is not correctly configured.");
return null;
}
return configCenterConfigurationSource;
}

configCenterConfigurationSource.init(localConfiguration);
return new DynamicWatchedConfiguration(configCenterConfigurationSource);
private static void createDynamicWatchedConfiguration(
ConcurrentCompositeConfiguration localConfiguration,
ConfigCenterConfigurationSource configCenterConfigurationSource) {
ConcurrentMapConfiguration injectConfig = new ConcurrentMapConfiguration();
localConfiguration.addConfigurationAtFront(injectConfig, "extraInjectConfig");
configCenterConfigurationSource.addUpdateListener(new ServiceCombPropertyUpdateListener(injectConfig));

DynamicWatchedConfiguration configFromConfigCenter =
new DynamicWatchedConfiguration(configCenterConfigurationSource);
duplicateServiceCombConfigToCse(configFromConfigCenter);
localConfiguration.addConfigurationAtFront(configFromConfigCenter, "configCenterConfig");
}

public static AbstractConfiguration createDynamicConfig() {
LOGGER.info("create dynamic config:");
ConcurrentCompositeConfiguration config = ConfigUtil.createLocalConfig();
DynamicWatchedConfiguration configFromConfigCenter = createConfigFromConfigCenter(config);
if (configFromConfigCenter != null) {
ConcurrentMapConfiguration injectConfig = new ConcurrentMapConfiguration();
config.addConfigurationAtFront(injectConfig, "extraInjectConfig");

duplicateServiceCombConfigToCse(configFromConfigCenter);
config.addConfigurationAtFront(configFromConfigCenter, "configCenterConfig");

configFromConfigCenter.getSource().addUpdateListener(new ServiceCombPropertyUpdateListener(injectConfig));
ConcurrentCompositeConfiguration compositeConfig = ConfigUtil.createLocalConfig();
ConfigCenterConfigurationSource configCenterConfigurationSource =
createConfigCenterConfigurationSource(compositeConfig);
if (configCenterConfigurationSource != null) {
createDynamicWatchedConfiguration(compositeConfig, configCenterConfigurationSource);
}

return config;
return compositeConfig;
}

public static void installDynamicConfig() {
Expand All @@ -222,8 +227,18 @@ public static void installDynamicConfig() {
return;
}

AbstractConfiguration dynamicConfig = ConfigUtil.createDynamicConfig();
ConfigurationManager.install(dynamicConfig);
ConcurrentCompositeConfiguration compositeConfig = ConfigUtil.createLocalConfig();
ConfigCenterConfigurationSource configCenterConfigurationSource =
createConfigCenterConfigurationSource(compositeConfig);
if (configCenterConfigurationSource != null) {
createDynamicWatchedConfiguration(compositeConfig, configCenterConfigurationSource);
}

ConfigurationManager.install(compositeConfig);

if (configCenterConfigurationSource != null) {
configCenterConfigurationSource.init(compositeConfig);
}
}

public static void addExtraConfig(String extraConfigName, Map<String, Object> extraConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.Map;

import org.apache.commons.configuration.AbstractConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.servicecomb.config.archaius.sources.ConfigModel;
import org.apache.servicecomb.config.archaius.sources.MicroserviceConfigLoader;
import org.apache.servicecomb.config.spi.ConfigCenterConfigurationSource;
Expand All @@ -49,7 +48,6 @@

import mockit.Deencapsulation;
import mockit.Expectations;
import mockit.Mocked;

public class TestConfigUtil {

Expand Down Expand Up @@ -96,12 +94,6 @@ public void testAddConfig() {
Assert.assertEquals(configuration.getInt("cse.test.num"), 10);
}

@Test
public void testCreateConfigFromConfigCenterNoUrl(@Mocked Configuration localConfiguration) {
AbstractConfiguration configFromConfigCenter = ConfigUtil.createConfigFromConfigCenter(localConfiguration);
Assert.assertNull(configFromConfigCenter);
}

@Test
public void testCreateDynamicConfigNoConfigCenterSPI() {
new Expectations(SPIServiceUtils.class) {
Expand Down