diff --git a/demo/demo-pojo/pojo-client/src/main/java/org/apache/servicecomb/demo/pojo/client/PojoClient.java b/demo/demo-pojo/pojo-client/src/main/java/org/apache/servicecomb/demo/pojo/client/PojoClient.java index 7ffd70dcd89..3549b47a29c 100644 --- a/demo/demo-pojo/pojo-client/src/main/java/org/apache/servicecomb/demo/pojo/client/PojoClient.java +++ b/demo/demo-pojo/pojo-client/src/main/java/org/apache/servicecomb/demo/pojo/client/PojoClient.java @@ -102,22 +102,32 @@ public static void run() throws Exception { ArchaiusUtils.setProperty("cse.loadbalance.strategy.name", "WeightedResponse"); testStringArray(test); boolean checkerStated = false; - Set allThreads = Thread.getAllStackTraces().keySet(); - for (Thread t : allThreads) { - if (t.getName().equals("NFLoadBalancer-serverWeightTimer-unknown")) { - checkerStated = true; + // Timer may not start thread very fast so check for 3 times. + for (int i = 0; i < 3; i++) { + Set allThreads = Thread.getAllStackTraces().keySet(); + for (Thread t : allThreads) { + if (t.getName().equals("NFLoadBalancer-serverWeightTimer-unknown")) { + checkerStated = true; + break; + } } + Thread.sleep(1000); } TestMgr.check(checkerStated, true); - + ArchaiusUtils.setProperty("cse.loadbalance.strategy.name", "RoundRobin"); testStringArray(test); - - allThreads = Thread.getAllStackTraces().keySet(); + boolean checkerDestroyed = true; - for (Thread t : allThreads) { - if (t.getName().equals("NFLoadBalancer-serverWeightTimer-unknown")) { - checkerDestroyed = false; + // Timer cancel may not destroy thread very fast so check for 3 times. + for (int i = 0; i < 3; i++) { + checkerDestroyed = true; + Set allThreads = Thread.getAllStackTraces().keySet(); + for (Thread t : allThreads) { + if (t.getName().equals("NFLoadBalancer-serverWeightTimer-unknown")) { + checkerDestroyed = false; + Thread.sleep(1000); + } } } TestMgr.check(checkerDestroyed, true); diff --git a/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ConfigCenterClient.java b/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ConfigCenterClient.java index 4407c1488a7..aaef866f069 100644 --- a/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ConfigCenterClient.java +++ b/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ConfigCenterClient.java @@ -86,6 +86,8 @@ public class ConfigCenterClient { private static final long HEARTBEAT_INTERVAL = 30000; + private static final long BOOTUP_WAIT_TIME = 10; + private ScheduledExecutorService heartbeatTask = null; private int refreshMode = CONFIG_CENTER_CONFIG.getRefreshMode(); @@ -129,7 +131,9 @@ public void connectServer() { throw new IllegalStateException(e); } refreshMembers(memberDiscovery); - EXECUTOR.scheduleWithFixedDelay(new ConfigRefresh(parseConfigUtils, memberDiscovery), + ConfigRefresh refreshTask = new ConfigRefresh(parseConfigUtils, memberDiscovery); + refreshTask.run(true); + EXECUTOR.scheduleWithFixedDelay(refreshTask, firstRefreshInterval, refreshInterval, TimeUnit.MILLISECONDS); @@ -155,6 +159,9 @@ private void refreshMembers(MemberDiscovery memberDiscovery) { request.headers().add("X-Auth-Token", ConfigCenterConfig.INSTANCE.getToken()); } authHeaderProviders.forEach(provider -> request.headers().addAll(provider.getSignAuthHeaders(signReq))); + request.exceptionHandler(e -> { + LOGGER.error("Fetch member from {} failed. Error message is [{}].", configCenter, e.getMessage()); + }); request.end(); }); } @@ -211,18 +218,16 @@ class ConfigRefresh implements Runnable { this.memberdis = memberdis; } - // 具体动作 - @Override - public void run() { + public void run(boolean wait) { // this will be single threaded, so we don't care about concurrent // staffs try { String configCenter = memberdis.getConfigServer(); if (refreshMode == 1) { - refreshConfig(configCenter); + refreshConfig(configCenter, wait); } else if (!isWatching) { // 重新监听时需要先加载,避免在断开期间丢失变更 - refreshConfig(configCenter); + refreshConfig(configCenter, wait); doWatch(configCenter); } } catch (Exception e) { @@ -230,6 +235,12 @@ public void run() { } } + // 具体动作 + @Override + public void run() { + run(false); + } + // create watch and wait for done public void doWatch(String configCenter) throws UnsupportedEncodingException, InterruptedException { @@ -270,7 +281,7 @@ public void doWatch(String configCenter) LOGGER.info("watching config recieved {}", action); Map mAction = action.toJsonObject().getMap(); if ("CREATE".equals(mAction.get("action"))) { - refreshConfig(configCenter); + refreshConfig(configCenter, false); } else if ("MEMBER_CHANGE".equals(mAction.get("action"))) { refreshMembers(memberdis); } else { @@ -282,7 +293,10 @@ public void doWatch(String configCenter) waiter.countDown(); }, e -> { - LOGGER.error("watcher connect to config center {} refresh port {} failed. Error message is [{}]", configCenter, refreshPort, e.getMessage()); + LOGGER.error("watcher connect to config center {} refresh port {} failed. Error message is [{}]", + configCenter, + refreshPort, + e.getMessage()); waiter.countDown(); }); }); @@ -313,7 +327,8 @@ private void sendHeartbeat(WebSocket ws) { } } - public void refreshConfig(String configcenter) { + public void refreshConfig(String configcenter, boolean wait) { + CountDownLatch latch = new CountDownLatch(1); clientMgr.findThreadBindClientPool().runOnContext(client -> { String path = URIConst.ITEMS + "?dimensionsInfo=" + StringUtils.deleteWhitespace(serviceName); IpPort ipPort = NetUtils.parseIpPortFromURI(configcenter); @@ -330,10 +345,12 @@ public void refreshConfig(String configcenter) { EventManager.post(new ConnFailEvent("config refresh result parse fail " + e.getMessage())); LOGGER.error("Config refresh from {} failed. Error message is [{}].", configcenter, e.getMessage()); } + latch.countDown(); }); } else { rsp.bodyHandler(buf -> { LOGGER.error("Server error message is [{}].", buf); + latch.countDown(); }); EventManager.post(new ConnFailEvent("fetch config fail")); LOGGER.error("Config refresh from {} failed.", configcenter); @@ -354,9 +371,19 @@ public void refreshConfig(String configcenter) { request.exceptionHandler(e -> { EventManager.post(new ConnFailEvent("fetch config fail")); LOGGER.error("Config refresh from {} failed. Error message is [{}].", configcenter, e.getMessage()); + latch.countDown(); }); request.end(); }); + if (wait) { + LOGGER.info("Refreshing remote config..."); + try { + latch.await(BOOTUP_WAIT_TIME, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.warn(e.getMessage()); + } + LOGGER.info("Refreshing remote config is done."); + } } } diff --git a/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/MemberDiscovery.java b/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/MemberDiscovery.java index 6db80e6f04e..cee4c046625 100644 --- a/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/MemberDiscovery.java +++ b/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/MemberDiscovery.java @@ -41,6 +41,8 @@ public class MemberDiscovery { private List configServerAddresses = new ArrayList<>(); + private Object lock = new Object(); + private AtomicInteger counter = new AtomicInteger(0); public MemberDiscovery(List configCenterUri) { @@ -52,11 +54,13 @@ public MemberDiscovery(List configCenterUri) { } public String getConfigServer() { - if(configServerAddresses.isEmpty()) { - throw new IllegalStateException("Config center address is not available."); + synchronized (lock) { + if (configServerAddresses.isEmpty()) { + throw new IllegalStateException("Config center address is not available."); + } + int index = Math.abs(counter.get() % configServerAddresses.size()); + return configServerAddresses.get(index); } - int index = Math.abs(counter.get() % configServerAddresses.size()); - return configServerAddresses.get(index); } @Subscribe @@ -65,17 +69,22 @@ public void onConnFailEvent(ConnFailEvent e) { } public void refreshMembers(JsonObject members) { - configServerAddresses.clear(); + List newServerAddresses = new ArrayList<>(); members.getJsonArray("instances").forEach(m -> { JsonObject instance = (JsonObject) m; if ("UP".equals(instance.getString("status", "UP"))) { String endpoint = instance.getJsonArray("endpoints").getString(0); String scheme = instance.getBoolean("isHttps", false) ? "https" : "http"; - configServerAddresses.add(scheme + SCHEMA_SEPRATOR + newServerAddresses.add(scheme + SCHEMA_SEPRATOR + endpoint.substring(endpoint.indexOf(SCHEMA_SEPRATOR) + SCHEMA_SEPRATOR.length())); } }); - Collections.shuffle(configServerAddresses); - LOGGER.info("config center members: {}", configServerAddresses); + + synchronized (lock) { + this.configServerAddresses.clear(); + this.configServerAddresses.addAll(newServerAddresses); + Collections.shuffle(this.configServerAddresses); + } + LOGGER.info("New config center members: {}", this.configServerAddresses); } } diff --git a/foundations/foundation-config/src/main/java/org/apache/servicecomb/config/ConfigUtil.java b/foundations/foundation-config/src/main/java/org/apache/servicecomb/config/ConfigUtil.java index bb3b4132272..6e2a6f6902b 100644 --- a/foundations/foundation-config/src/main/java/org/apache/servicecomb/config/ConfigUtil.java +++ b/foundations/foundation-config/src/main/java/org/apache/servicecomb/config/ConfigUtil.java @@ -181,7 +181,8 @@ private static void duplicateServiceCombConfigToCse(ConcurrentCompositeConfigura compositeConfiguration.addConfiguration(source, sourceName); } - public static DynamicWatchedConfiguration createConfigFromConfigCenter(Configuration localConfiguration) { + private static ConfigCenterConfigurationSource createConfigCenterConfigurationSource( + Configuration localConfiguration) { ConfigCenterConfigurationSource configCenterConfigurationSource = SPIServiceUtils.getTargetService(ConfigCenterConfigurationSource.class); if (null == configCenterConfigurationSource) { @@ -194,26 +195,30 @@ public static DynamicWatchedConfiguration createConfigFromConfigCenter(Configura LOGGER.info("Config Source serverUri is not correctly configured."); return null; } + return configCenterConfigurationSource; + } - configCenterConfigurationSource.init(localConfiguration); - return new DynamicWatchedConfiguration(configCenterConfigurationSource); + private static void createDynamicWatchedConfiguration( + ConcurrentCompositeConfiguration localConfiguration, + ConfigCenterConfigurationSource configCenterConfigurationSource) { + ConcurrentMapConfiguration injectConfig = new ConcurrentMapConfiguration(); + localConfiguration.addConfigurationAtFront(injectConfig, "extraInjectConfig"); + configCenterConfigurationSource.addUpdateListener(new ServiceCombPropertyUpdateListener(injectConfig)); + + DynamicWatchedConfiguration configFromConfigCenter = + new DynamicWatchedConfiguration(configCenterConfigurationSource); + duplicateServiceCombConfigToCse(configFromConfigCenter); + localConfiguration.addConfigurationAtFront(configFromConfigCenter, "configCenterConfig"); } public static AbstractConfiguration createDynamicConfig() { - LOGGER.info("create dynamic config:"); - ConcurrentCompositeConfiguration config = ConfigUtil.createLocalConfig(); - DynamicWatchedConfiguration configFromConfigCenter = createConfigFromConfigCenter(config); - if (configFromConfigCenter != null) { - ConcurrentMapConfiguration injectConfig = new ConcurrentMapConfiguration(); - config.addConfigurationAtFront(injectConfig, "extraInjectConfig"); - - duplicateServiceCombConfigToCse(configFromConfigCenter); - config.addConfigurationAtFront(configFromConfigCenter, "configCenterConfig"); - - configFromConfigCenter.getSource().addUpdateListener(new ServiceCombPropertyUpdateListener(injectConfig)); + ConcurrentCompositeConfiguration compositeConfig = ConfigUtil.createLocalConfig(); + ConfigCenterConfigurationSource configCenterConfigurationSource = + createConfigCenterConfigurationSource(compositeConfig); + if (configCenterConfigurationSource != null) { + createDynamicWatchedConfiguration(compositeConfig, configCenterConfigurationSource); } - - return config; + return compositeConfig; } public static void installDynamicConfig() { @@ -222,8 +227,18 @@ public static void installDynamicConfig() { return; } - AbstractConfiguration dynamicConfig = ConfigUtil.createDynamicConfig(); - ConfigurationManager.install(dynamicConfig); + ConcurrentCompositeConfiguration compositeConfig = ConfigUtil.createLocalConfig(); + ConfigCenterConfigurationSource configCenterConfigurationSource = + createConfigCenterConfigurationSource(compositeConfig); + if (configCenterConfigurationSource != null) { + createDynamicWatchedConfiguration(compositeConfig, configCenterConfigurationSource); + } + + ConfigurationManager.install(compositeConfig); + + if (configCenterConfigurationSource != null) { + configCenterConfigurationSource.init(compositeConfig); + } } public static void addExtraConfig(String extraConfigName, Map extraConfig) { diff --git a/foundations/foundation-config/src/test/java/org/apache/servicecomb/config/TestConfigUtil.java b/foundations/foundation-config/src/test/java/org/apache/servicecomb/config/TestConfigUtil.java index cd04d4e70ee..119c6add489 100644 --- a/foundations/foundation-config/src/test/java/org/apache/servicecomb/config/TestConfigUtil.java +++ b/foundations/foundation-config/src/test/java/org/apache/servicecomb/config/TestConfigUtil.java @@ -30,7 +30,6 @@ import java.util.Map; import org.apache.commons.configuration.AbstractConfiguration; -import org.apache.commons.configuration.Configuration; import org.apache.servicecomb.config.archaius.sources.ConfigModel; import org.apache.servicecomb.config.archaius.sources.MicroserviceConfigLoader; import org.apache.servicecomb.config.spi.ConfigCenterConfigurationSource; @@ -49,7 +48,6 @@ import mockit.Deencapsulation; import mockit.Expectations; -import mockit.Mocked; public class TestConfigUtil { @@ -96,12 +94,6 @@ public void testAddConfig() { Assert.assertEquals(configuration.getInt("cse.test.num"), 10); } - @Test - public void testCreateConfigFromConfigCenterNoUrl(@Mocked Configuration localConfiguration) { - AbstractConfiguration configFromConfigCenter = ConfigUtil.createConfigFromConfigCenter(localConfiguration); - Assert.assertNull(configFromConfigCenter); - } - @Test public void testCreateDynamicConfigNoConfigCenterSPI() { new Expectations(SPIServiceUtils.class) {