From 2781e18cc2cc3daa7ae9786e32a63a572bddc0a2 Mon Sep 17 00:00:00 2001 From: Jason Song Date: Wed, 20 Apr 2016 10:22:25 +0800 Subject: [PATCH] Add client side long polling support and server side mock impl --- .../internals/ConfigServiceLocator.java | 6 +- .../ctrip/apollo/internals/DefaultConfig.java | 7 + .../internals/LocalFileConfigRepository.java | 4 +- .../internals/RemoteConfigRepository.java | 132 ++++++++++++++++-- .../ctrip/apollo/internals/SimpleConfig.java | 2 + .../ctrip/apollo/model/ConfigChangeEvent.java | 3 +- .../com/ctrip/apollo/util/http/HttpUtil.java | 21 +-- .../integration/ConfigIntegrationTest.java | 66 ++++++++- .../internals/RemoteConfigRepositoryTest.java | 52 ++++++- .../controller/ConfigController.java | 2 +- .../controller/NotificationController.java | 82 +++++++++++ 11 files changed, 349 insertions(+), 28 deletions(-) create mode 100644 apollo-configservice/src/main/java/com/ctrip/apollo/configservice/controller/NotificationController.java diff --git a/apollo-client/src/main/java/com/ctrip/apollo/internals/ConfigServiceLocator.java b/apollo-client/src/main/java/com/ctrip/apollo/internals/ConfigServiceLocator.java index 4a61b6c79b0..5421e347c96 100644 --- a/apollo-client/src/main/java/com/ctrip/apollo/internals/ConfigServiceLocator.java +++ b/apollo-client/src/main/java/com/ctrip/apollo/internals/ConfigServiceLocator.java @@ -30,7 +30,7 @@ public class ConfigServiceLocator { private Type m_responseType; /** - * Create a config service locator + * Create a config service locator. */ public ConfigServiceLocator() { List initial = Lists.newArrayList(); @@ -88,8 +88,8 @@ private void updateConfigServices() { throw new RuntimeException("Get config services failed", exception); } - private void logConfigServicesToCat(List serviceDTOs) { - for (ServiceDTO serviceDTO : serviceDTOs) { + private void logConfigServicesToCat(List serviceDtos) { + for (ServiceDTO serviceDTO : serviceDtos) { Cat.logEvent("Apollo.Config.Services", serviceDTO.getHomepageUrl()); } } diff --git a/apollo-client/src/main/java/com/ctrip/apollo/internals/DefaultConfig.java b/apollo-client/src/main/java/com/ctrip/apollo/internals/DefaultConfig.java index 9331d5de2eb..b340ddcf198 100644 --- a/apollo-client/src/main/java/com/ctrip/apollo/internals/DefaultConfig.java +++ b/apollo-client/src/main/java/com/ctrip/apollo/internals/DefaultConfig.java @@ -95,7 +95,14 @@ public synchronized void onRepositoryChange(String namespace, Properties newProp Map actualChanges = updateAndCalcConfigChanges(newConfigProperties); + //check double checked result + if (actualChanges.isEmpty()) { + return; + } + this.fireConfigChange(new ConfigChangeEvent(m_namespace, actualChanges)); + + Cat.logEvent("Apollo.Client.ConfigChanges", m_namespace); } private Map updateAndCalcConfigChanges(Properties newConfigProperties) { diff --git a/apollo-client/src/main/java/com/ctrip/apollo/internals/LocalFileConfigRepository.java b/apollo-client/src/main/java/com/ctrip/apollo/internals/LocalFileConfigRepository.java index f502eacb95e..2a56d682729 100644 --- a/apollo-client/src/main/java/com/ctrip/apollo/internals/LocalFileConfigRepository.java +++ b/apollo-client/src/main/java/com/ctrip/apollo/internals/LocalFileConfigRepository.java @@ -120,7 +120,9 @@ private void trySyncFromFallback() { updateFileProperties(properties); } catch (Throwable ex) { Cat.logError(ex); - logger.warn("Sync config from fallback repository {} failed, reason: {}", m_fallback.getClass(), ex); + logger + .warn("Sync config from fallback repository {} failed, reason: {}", m_fallback.getClass(), + ex); } } diff --git a/apollo-client/src/main/java/com/ctrip/apollo/internals/RemoteConfigRepository.java b/apollo-client/src/main/java/com/ctrip/apollo/internals/RemoteConfigRepository.java index e833a8468c3..8d09f7fdc94 100644 --- a/apollo-client/src/main/java/com/ctrip/apollo/internals/RemoteConfigRepository.java +++ b/apollo-client/src/main/java/com/ctrip/apollo/internals/RemoteConfigRepository.java @@ -1,9 +1,14 @@ package com.ctrip.apollo.internals; +import com.google.common.base.Joiner; import com.google.common.base.Strings; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.escape.Escaper; +import com.google.common.net.UrlEscapers; import com.ctrip.apollo.core.dto.ApolloConfig; +import com.ctrip.apollo.core.dto.ApolloConfigNotification; import com.ctrip.apollo.core.dto.ServiceDTO; import com.ctrip.apollo.core.utils.ApolloThreadFactory; import com.ctrip.apollo.util.ConfigUtil; @@ -22,10 +27,14 @@ import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Properties; +import java.util.Random; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; /** @@ -40,6 +49,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository { private volatile AtomicReference m_configCache; private final String m_namespace; private final ScheduledExecutorService m_executorService; + private final AtomicBoolean m_longPollingStopped; /** * Constructor. @@ -58,10 +68,12 @@ public RemoteConfigRepository(String namespace) { Cat.logError(ex); throw new IllegalStateException("Unable to load component!", ex); } + this.m_longPollingStopped = new AtomicBoolean(false); this.m_executorService = Executors.newScheduledThreadPool(1, ApolloThreadFactory.create("RemoteConfigRepository", true)); this.trySync(); this.schedulePeriodicRefresh(); + this.scheduleLongPollingRefresh(); } @Override @@ -84,7 +96,10 @@ private void schedulePeriodicRefresh() { new Runnable() { @Override public void run() { + Transaction transaction = Cat.newTransaction("Apollo.ConfigService", "periodicRefresh"); trySync(); + transaction.setStatus(Message.SUCCESS); + transaction.complete(); } }, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(), m_configUtil.getRefreshTimeUnit()); @@ -113,11 +128,11 @@ private Properties transformApolloConfigToProperties(ApolloConfig apolloConfig) return result; } - private ApolloConfig loadApolloConfig() { String appId = m_configUtil.getAppId(); String cluster = m_configUtil.getCluster(); - Cat.logEvent("Apollo.Client.Config", String.format("%s-%s-%s", appId, cluster, m_namespace)); + Cat.logEvent("Apollo.Client.ConfigInfo", + String.format("%s-%s-%s", appId, cluster, m_namespace)); int maxRetries = 2; Throwable exception = null; @@ -128,7 +143,7 @@ private ApolloConfig loadApolloConfig() { for (ServiceDTO configService : randomConfigServices) { String url = - assembleUrl(configService.getHomepageUrl(), appId, cluster, m_namespace, + assembleQueryConfigUrl(configService.getHomepageUrl(), appId, cluster, m_namespace, m_configCache.get()); logger.debug("Loading config from {}", url); @@ -172,18 +187,19 @@ private ApolloConfig loadApolloConfig() { throw new RuntimeException(message, exception); } - private String assembleUrl(String uri, String appId, String cluster, String namespace, - ApolloConfig previousConfig) { + private String assembleQueryConfigUrl(String uri, String appId, String cluster, String namespace, + ApolloConfig previousConfig) { + Escaper escaper = UrlEscapers.urlPathSegmentEscaper(); String path = "configs/%s/%s"; - List params = Lists.newArrayList(appId, cluster); + List params = Lists.newArrayList(escaper.escape(appId), escaper.escape(cluster)); if (!Strings.isNullOrEmpty(namespace)) { path = path + "/%s"; - params.add(namespace); + params.add(escaper.escape(namespace)); } if (previousConfig != null) { path = path + "?releaseId=%s"; - params.add(String.valueOf(previousConfig.getReleaseId())); + params.add(escaper.escape(String.valueOf(previousConfig.getReleaseId()))); } String pathExpanded = String.format(path, params.toArray()); @@ -193,6 +209,106 @@ private String assembleUrl(String uri, String appId, String cluster, String name return uri + pathExpanded; } + private void scheduleLongPollingRefresh() { + final String appId = m_configUtil.getAppId(); + final String cluster = m_configUtil.getCluster(); + final ExecutorService longPollingService = + Executors.newFixedThreadPool(2, + ApolloThreadFactory.create("RemoteConfigRepository-LongPolling", true)); + longPollingService.submit(new Runnable() { + @Override + public void run() { + doLongPollingRefresh(appId, cluster, longPollingService); + } + }); + } + + private void doLongPollingRefresh(String appId, String cluster, + ExecutorService longPollingService) { + final Random random = new Random(); + ServiceDTO lastServiceDto = null; + Transaction transaction = null; + while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) { + try { + if (lastServiceDto == null) { + List configServices = getConfigServices(); + lastServiceDto = configServices.get(random.nextInt(configServices.size())); + } + + String url = + assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, + m_namespace, m_configCache.get()); + + logger.debug("Long polling from {}", url); + HttpRequest request = new HttpRequest(url); + //no timeout for read + request.setReadTimeout(0); + + transaction = Cat.newTransaction("Apollo.ConfigService", "pollNotification"); + transaction.addData("Url", url); + + HttpResponse response = + m_httpUtil.doGet(request, ApolloConfigNotification.class); + + logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url); + if (response.getStatusCode() == 200) { + longPollingService.submit(new Runnable() { + @Override + public void run() { + trySync(); + } + }); + } + transaction.addData("StatusCode", response.getStatusCode()); + transaction.setStatus(Message.SUCCESS); + } catch (Throwable ex) { + logger.warn("Long polling failed for appId: {}, cluster: {}, namespace: {}, reason: {}", + appId, cluster, m_namespace, ex); + lastServiceDto = null; + Cat.logError(ex); + if (transaction != null) { + transaction.setStatus(ex); + } + try { + TimeUnit.SECONDS.sleep(10); + } catch (InterruptedException ie) { + //ignore + } + } finally { + if (transaction != null) { + transaction.complete(); + } + } + } + } + + private String assembleLongPollRefreshUrl(String uri, String appId, String cluster, + String namespace, + ApolloConfig previousConfig) { + Escaper escaper = UrlEscapers.urlPathSegmentEscaper(); + Map queryParams = Maps.newHashMap(); + queryParams.put("appId", escaper.escape(appId)); + queryParams.put("cluster", escaper.escape(cluster)); + + if (!Strings.isNullOrEmpty(namespace)) { + queryParams.put("namespace", escaper.escape(namespace)); + } + if (previousConfig != null) { + queryParams.put("releaseId", escaper.escape(previousConfig.getReleaseId())); + } + + String params = Joiner.on("&").withKeyValueSeparator("=").join(queryParams); + if (!uri.endsWith("/")) { + uri += "/"; + } + + return uri + "notifications?" + params; + } + + void stopLongPollingRefresh() { + this.m_longPollingStopped.compareAndSet(false, true); + } + private List getConfigServices() { List services = m_serviceLocator.getConfigServices(); if (services.size() == 0) { diff --git a/apollo-client/src/main/java/com/ctrip/apollo/internals/SimpleConfig.java b/apollo-client/src/main/java/com/ctrip/apollo/internals/SimpleConfig.java index 81ce44eebaa..8116bf89231 100644 --- a/apollo-client/src/main/java/com/ctrip/apollo/internals/SimpleConfig.java +++ b/apollo-client/src/main/java/com/ctrip/apollo/internals/SimpleConfig.java @@ -74,5 +74,7 @@ public String apply(ConfigChange input) { m_configProperties = newConfigProperties; this.fireConfigChange(new ConfigChangeEvent(m_namespace, changeMap)); + + Cat.logEvent("Apollo.Client.ConfigChanges", m_namespace); } } diff --git a/apollo-client/src/main/java/com/ctrip/apollo/model/ConfigChangeEvent.java b/apollo-client/src/main/java/com/ctrip/apollo/model/ConfigChangeEvent.java index 72ab39d2dea..8a2e0991602 100644 --- a/apollo-client/src/main/java/com/ctrip/apollo/model/ConfigChangeEvent.java +++ b/apollo-client/src/main/java/com/ctrip/apollo/model/ConfigChangeEvent.java @@ -40,7 +40,8 @@ public ConfigChange getChange(String key) { } /** - * Get the changes. Please note that the returned Map is immutable. + * Get the changes as map. + * Please note that the returned Map is immutable. * @return changes */ public Map getChanges() { diff --git a/apollo-client/src/main/java/com/ctrip/apollo/util/http/HttpUtil.java b/apollo-client/src/main/java/com/ctrip/apollo/util/http/HttpUtil.java index 372acb29433..8dc9d846c5c 100644 --- a/apollo-client/src/main/java/com/ctrip/apollo/util/http/HttpUtil.java +++ b/apollo-client/src/main/java/com/ctrip/apollo/util/http/HttpUtil.java @@ -27,20 +27,23 @@ public class HttpUtil { private ConfigUtil m_configUtil; private Gson gson; private String basicAuth; - + + /** + * Constructor. + */ public HttpUtil() { gson = new Gson(); try { - basicAuth = "Basic " + BaseEncoding.base64().encode("user:".getBytes("UTF-8")); - } catch (UnsupportedEncodingException e) { - e.printStackTrace(); + basicAuth = "Basic " + BaseEncoding.base64().encode("user:".getBytes("UTF-8")); + } catch (UnsupportedEncodingException ex) { + ex.printStackTrace(); } } /** * Do get operation for the http request. * - * @param httpRequest the request + * @param httpRequest the request * @param responseType the response type * @return the response * @throws RuntimeException if any error happened or response code is neither 200 nor 304 @@ -59,7 +62,7 @@ public T apply(String input) { /** * Do get operation for the http request. * - * @param httpRequest the request + * @param httpRequest the request * @param responseType the response type * @return the response * @throws RuntimeException if any error happened or response code is neither 200 nor 304 @@ -76,14 +79,14 @@ public T apply(String input) { } private HttpResponse doGetWithSerializeFunction(HttpRequest httpRequest, - Function serializeFunction) { + Function serializeFunction) { InputStream is = null; try { HttpURLConnection conn = (HttpURLConnection) new URL(httpRequest.getUrl()).openConnection(); conn.setRequestMethod("GET"); - conn.setRequestProperty ("Authorization", basicAuth); - + conn.setRequestProperty("Authorization", basicAuth); + int connectTimeout = httpRequest.getConnectTimeout(); if (connectTimeout < 0) { connectTimeout = m_configUtil.getConnectTimeout(); diff --git a/apollo-client/src/test/java/com/ctrip/apollo/integration/ConfigIntegrationTest.java b/apollo-client/src/test/java/com/ctrip/apollo/integration/ConfigIntegrationTest.java index caff825d36c..f0c60b1f308 100644 --- a/apollo-client/src/test/java/com/ctrip/apollo/integration/ConfigIntegrationTest.java +++ b/apollo-client/src/test/java/com/ctrip/apollo/integration/ConfigIntegrationTest.java @@ -9,6 +9,7 @@ import com.ctrip.apollo.ConfigService; import com.ctrip.apollo.core.ConfigConsts; import com.ctrip.apollo.core.dto.ApolloConfig; +import com.ctrip.apollo.core.dto.ApolloConfigNotification; import com.ctrip.apollo.core.utils.ClassLoaderUtil; import com.ctrip.apollo.model.ConfigChangeEvent; @@ -196,6 +197,7 @@ public void testRefreshConfig() throws Exception { config.addChangeListener(new ConfigChangeListener() { AtomicInteger counter = new AtomicInteger(0); + @Override public void onChange(ConfigChangeEvent changeEvent) { //only need to assert once @@ -220,6 +222,66 @@ public void onChange(ConfigChangeEvent changeEvent) { assertEquals(anotherValue, config.getProperty(someKey, null)); } + @Test + public void testLongPollRefresh() throws Exception { + final String someKey = "someKey"; + final String someValue = "someValue"; + final String anotherValue = "anotherValue"; + + Map configurations = Maps.newHashMap(); + configurations.put(someKey, someValue); + ApolloConfig apolloConfig = assembleApolloConfig(configurations); + ContextHandler configHandler = mockConfigServerHandler(HttpServletResponse.SC_OK, apolloConfig); + ContextHandler pollHandler = + mockPollNotificationHandler(50, HttpServletResponse.SC_OK, + new ApolloConfigNotification(apolloConfig.getAppId(), apolloConfig.getCluster(), + apolloConfig.getNamespace()), false); + + startServerWithHandlers(configHandler, pollHandler); + + Config config = ConfigService.getConfig(); + assertEquals(someValue, config.getProperty(someKey, null)); + + apolloConfig.getConfigurations().put(someKey, anotherValue); + + TimeUnit.MILLISECONDS.sleep(60); + + assertEquals(anotherValue, config.getProperty(someKey, null)); + + } + + private ContextHandler mockPollNotificationHandler(final long pollResultTimeOutInMS, + final int statusCode, + final ApolloConfigNotification result, + final boolean failedAtFirstTime) { + ContextHandler context = new ContextHandler("/notifications"); + context.setHandler(new AbstractHandler() { + AtomicInteger counter = new AtomicInteger(0); + + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, + HttpServletResponse response) throws IOException, ServletException { + if (failedAtFirstTime && counter.incrementAndGet() == 1) { + response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + baseRequest.setHandled(true); + return; + } + + try { + TimeUnit.MILLISECONDS.sleep(pollResultTimeOutInMS); + } catch (InterruptedException e) { + } + + response.setContentType("application/json;charset=UTF-8"); + response.setStatus(statusCode); + response.getWriter().println(gson.toJson(result)); + baseRequest.setHandled(true); + } + }); + + return context; + } + private ContextHandler mockConfigServerHandler(final int statusCode, final ApolloConfig result, final boolean failedAtFirstTime) { ContextHandler context = new ContextHandler("/configs/*"); @@ -237,15 +299,11 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques response.setContentType("application/json;charset=UTF-8"); response.setStatus(statusCode); - response.getWriter().println(gson.toJson(result)); - baseRequest.setHandled(true); } }); - return context; - } diff --git a/apollo-client/src/test/java/com/ctrip/apollo/internals/RemoteConfigRepositoryTest.java b/apollo-client/src/test/java/com/ctrip/apollo/internals/RemoteConfigRepositoryTest.java index 36372f9c7b8..c4f28bb222b 100644 --- a/apollo-client/src/test/java/com/ctrip/apollo/internals/RemoteConfigRepositoryTest.java +++ b/apollo-client/src/test/java/com/ctrip/apollo/internals/RemoteConfigRepositoryTest.java @@ -5,6 +5,7 @@ import com.google.common.collect.Maps; import com.ctrip.apollo.core.dto.ApolloConfig; +import com.ctrip.apollo.core.dto.ApolloConfigNotification; import com.ctrip.apollo.core.dto.ServiceDTO; import com.ctrip.apollo.util.ConfigUtil; import com.ctrip.apollo.util.http.HttpRequest; @@ -22,6 +23,9 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import javax.servlet.http.HttpServletResponse; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.eq; @@ -41,6 +45,8 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase { @Mock private static HttpResponse someResponse; @Mock + private static HttpResponse pollResponse; + @Mock private ConfigUtil someConfigUtil; @Before @@ -48,6 +54,8 @@ public void setUp() throws Exception { super.setUp(); someNamespace = "someName"; + when(pollResponse.getStatusCode()).thenReturn(HttpServletResponse.SC_NOT_MODIFIED); + defineComponent(ConfigUtil.class, MockConfigUtil.class); defineComponent(ConfigServiceLocator.class, MockConfigServiceLocator.class); defineComponent(HttpUtil.class, MockHttpUtil.class); @@ -65,9 +73,11 @@ public void testLoadConfig() throws Exception { when(someResponse.getBody()).thenReturn(someApolloConfig); RemoteConfigRepository remoteConfigRepository = new RemoteConfigRepository(someNamespace); + Properties config = remoteConfigRepository.getConfig(); assertEquals(configurations, config); + remoteConfigRepository.stopLongPollingRefresh(); } @Test(expected = RuntimeException.class) @@ -76,6 +86,10 @@ public void testGetRemoteConfigWithServerError() throws Exception { when(someResponse.getStatusCode()).thenReturn(500); RemoteConfigRepository remoteConfigRepository = new RemoteConfigRepository(someNamespace); + + //must stop the long polling before exception occurred + remoteConfigRepository.stopLongPollingRefresh(); + remoteConfigRepository.getConfig(); } @@ -102,6 +116,35 @@ public void testRepositoryChangeListener() throws Exception { verify(someListener, times(1)).onRepositoryChange(eq(someNamespace), captor.capture()); assertEquals(newConfigurations, captor.getValue()); + + remoteConfigRepository.stopLongPollingRefresh(); + } + + @Test + public void testLongPollingRefresh() throws Exception { + Map configurations = ImmutableMap.of("someKey", "someValue"); + ApolloConfig someApolloConfig = assembleApolloConfig(configurations); + + when(someResponse.getStatusCode()).thenReturn(200); + when(someResponse.getBody()).thenReturn(someApolloConfig); + + RepositoryChangeListener someListener = mock(RepositoryChangeListener.class); + RemoteConfigRepository remoteConfigRepository = new RemoteConfigRepository(someNamespace); + remoteConfigRepository.addChangeListener(someListener); + final ArgumentCaptor captor = ArgumentCaptor.forClass(Properties.class); + + Map newConfigurations = ImmutableMap.of("someKey", "anotherValue"); + ApolloConfig newApolloConfig = assembleApolloConfig(newConfigurations); + + when(pollResponse.getStatusCode()).thenReturn(HttpServletResponse.SC_OK); + when(someResponse.getBody()).thenReturn(newApolloConfig); + + TimeUnit.MILLISECONDS.sleep(60); + + remoteConfigRepository.stopLongPollingRefresh(); + + verify(someListener, times(1)).onRepositoryChange(eq(someNamespace), captor.capture()); + assertEquals(newConfigurations, captor.getValue()); } private ApolloConfig assembleApolloConfig(Map configurations) { @@ -109,7 +152,7 @@ private ApolloConfig assembleApolloConfig(Map configurations) { String someClusterName = "cluster"; String someReleaseId = "1"; ApolloConfig apolloConfig = - new ApolloConfig(someAppId, someClusterName, someNamespace, someReleaseId); + new ApolloConfig(someAppId, someClusterName, someNamespace, someReleaseId); apolloConfig.setConfigurations(configurations); @@ -143,6 +186,13 @@ public List getConfigServices() { public static class MockHttpUtil extends HttpUtil { @Override public HttpResponse doGet(HttpRequest httpRequest, Class responseType) { + if (httpRequest.getUrl().contains("notifications?")) { + try { + TimeUnit.MILLISECONDS.sleep(50); + } catch (InterruptedException e) { + } + return (HttpResponse) pollResponse; + } return (HttpResponse) someResponse; } } diff --git a/apollo-configservice/src/main/java/com/ctrip/apollo/configservice/controller/ConfigController.java b/apollo-configservice/src/main/java/com/ctrip/apollo/configservice/controller/ConfigController.java index db9b79af2ac..69516322cde 100644 --- a/apollo-configservice/src/main/java/com/ctrip/apollo/configservice/controller/ConfigController.java +++ b/apollo-configservice/src/main/java/com/ctrip/apollo/configservice/controller/ConfigController.java @@ -40,7 +40,7 @@ public ApolloConfig queryConfig(@PathVariable String appId, @PathVariable String @RequestParam(value = "releaseId", defaultValue = "-1") String clientSideReleaseId, HttpServletResponse response) throws IOException { Release release = configService.findRelease(appId, clusterName, namespace); - //TODO if namespace != application, should also query config by namespace and DC? + //TODO if namespace != application, should also query config by namespace and DC(default if DC not found)? //And if found, should merge config, as well as releaseId -> make releaseId a string? if (release == null) { response.sendError(HttpServletResponse.SC_NOT_FOUND, diff --git a/apollo-configservice/src/main/java/com/ctrip/apollo/configservice/controller/NotificationController.java b/apollo-configservice/src/main/java/com/ctrip/apollo/configservice/controller/NotificationController.java new file mode 100644 index 00000000000..b3a6b016c1c --- /dev/null +++ b/apollo-configservice/src/main/java/com/ctrip/apollo/configservice/controller/NotificationController.java @@ -0,0 +1,82 @@ +package com.ctrip.apollo.configservice.controller; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; + +import com.ctrip.apollo.core.ConfigConsts; +import com.ctrip.apollo.core.dto.ApolloConfigNotification; +import com.ctrip.apollo.core.utils.ApolloThreadFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.context.request.async.DeferredResult; + +import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import javax.servlet.http.HttpServletResponse; + +/** + * @author Jason Song(song_s@ctrip.com) + */ +@RestController +@RequestMapping("/notifications") +public class NotificationController { + private static final Logger logger = LoggerFactory.getLogger(NotificationController.class); + private final static long TIMEOUT = 60 * 60 * 1000;//60 MINUTES + private final Multimap> deferredResults = + Multimaps.synchronizedSetMultimap(HashMultimap.create()); + + { + startRandomChange(); + } + + @RequestMapping(method = RequestMethod.GET) + public DeferredResult pollNotification( + @RequestParam(value = "appId") String appId, + @RequestParam(value = "cluster") String cluster, + @RequestParam(value = "namespace", defaultValue = ConfigConsts.NAMESPACE_APPLICATION) String namespace, + @RequestParam(value = "releaseId", defaultValue = "-1") String clientSideReleaseId, + HttpServletResponse response) { + DeferredResult deferredResult = + new DeferredResult<>(TIMEOUT); + String key = assembleKey(appId, cluster, namespace); + this.deferredResults.put(key, deferredResult); + + deferredResult.onCompletion(() -> { + logger.info("deferred result for {} {} {} completed", appId, cluster, namespace); + deferredResults.remove(key, deferredResult); + }); + + deferredResult.onTimeout(() -> { + logger.info("deferred result for {} {} {} timeout", appId, cluster, namespace); + response.setStatus(HttpServletResponse.SC_NOT_MODIFIED); + }); + + logger.info("deferred result for {} {} {} returned", appId, cluster, namespace); + return deferredResult; + } + + private void startRandomChange() { + Random random = new Random(); + ScheduledExecutorService testService = Executors.newScheduledThreadPool(1, + ApolloThreadFactory.create("NotificationController", true)); + testService.scheduleAtFixedRate((Runnable) () -> deferredResults + .entries().stream().filter(entry -> random.nextBoolean()).forEach(entry -> { + String[] keys = entry.getKey().split("-"); + entry.getValue().setResult(new ApolloConfigNotification(keys[0], keys[1], keys[2])); + }), 30, 30, TimeUnit.SECONDS); + } + + private String assembleKey(String appId, String cluster, String namespace) { + return String.format("%s-%s-%s", appId, cluster, namespace); + } +} +