Skip to content

Commit

Permalink
Add client side long polling support and server side mock impl
Browse files Browse the repository at this point in the history
  • Loading branch information
nobodyiam committed Apr 20, 2016
1 parent 93d1ce6 commit 2781e18
Show file tree
Hide file tree
Showing 11 changed files with 349 additions and 28 deletions.
Expand Up @@ -30,7 +30,7 @@ public class ConfigServiceLocator {
private Type m_responseType;

/**
* Create a config service locator
* Create a config service locator.
*/
public ConfigServiceLocator() {
List<ServiceDTO> initial = Lists.newArrayList();
Expand Down Expand Up @@ -88,8 +88,8 @@ private void updateConfigServices() {
throw new RuntimeException("Get config services failed", exception);
}

private void logConfigServicesToCat(List<ServiceDTO> serviceDTOs) {
for (ServiceDTO serviceDTO : serviceDTOs) {
private void logConfigServicesToCat(List<ServiceDTO> serviceDtos) {
for (ServiceDTO serviceDTO : serviceDtos) {
Cat.logEvent("Apollo.Config.Services", serviceDTO.getHomepageUrl());
}
}
Expand Down
Expand Up @@ -95,7 +95,14 @@ public synchronized void onRepositoryChange(String namespace, Properties newProp

Map<String, ConfigChange> actualChanges = updateAndCalcConfigChanges(newConfigProperties);

//check double checked result
if (actualChanges.isEmpty()) {
return;
}

this.fireConfigChange(new ConfigChangeEvent(m_namespace, actualChanges));

Cat.logEvent("Apollo.Client.ConfigChanges", m_namespace);
}

private Map<String, ConfigChange> updateAndCalcConfigChanges(Properties newConfigProperties) {
Expand Down
Expand Up @@ -120,7 +120,9 @@ private void trySyncFromFallback() {
updateFileProperties(properties);
} catch (Throwable ex) {
Cat.logError(ex);
logger.warn("Sync config from fallback repository {} failed, reason: {}", m_fallback.getClass(), ex);
logger
.warn("Sync config from fallback repository {} failed, reason: {}", m_fallback.getClass(),
ex);
}
}

Expand Down
@@ -1,9 +1,14 @@
package com.ctrip.apollo.internals;

import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.escape.Escaper;
import com.google.common.net.UrlEscapers;

import com.ctrip.apollo.core.dto.ApolloConfig;
import com.ctrip.apollo.core.dto.ApolloConfigNotification;
import com.ctrip.apollo.core.dto.ServiceDTO;
import com.ctrip.apollo.core.utils.ApolloThreadFactory;
import com.ctrip.apollo.util.ConfigUtil;
Expand All @@ -22,10 +27,14 @@

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand All @@ -40,6 +49,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
private volatile AtomicReference<ApolloConfig> m_configCache;
private final String m_namespace;
private final ScheduledExecutorService m_executorService;
private final AtomicBoolean m_longPollingStopped;

/**
* Constructor.
Expand All @@ -58,10 +68,12 @@ public RemoteConfigRepository(String namespace) {
Cat.logError(ex);
throw new IllegalStateException("Unable to load component!", ex);
}
this.m_longPollingStopped = new AtomicBoolean(false);
this.m_executorService = Executors.newScheduledThreadPool(1,
ApolloThreadFactory.create("RemoteConfigRepository", true));
this.trySync();
this.schedulePeriodicRefresh();
this.scheduleLongPollingRefresh();
}

@Override
Expand All @@ -84,7 +96,10 @@ private void schedulePeriodicRefresh() {
new Runnable() {
@Override
public void run() {
Transaction transaction = Cat.newTransaction("Apollo.ConfigService", "periodicRefresh");
trySync();
transaction.setStatus(Message.SUCCESS);
transaction.complete();
}
}, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(),
m_configUtil.getRefreshTimeUnit());
Expand Down Expand Up @@ -113,11 +128,11 @@ private Properties transformApolloConfigToProperties(ApolloConfig apolloConfig)
return result;
}


private ApolloConfig loadApolloConfig() {
String appId = m_configUtil.getAppId();
String cluster = m_configUtil.getCluster();
Cat.logEvent("Apollo.Client.Config", String.format("%s-%s-%s", appId, cluster, m_namespace));
Cat.logEvent("Apollo.Client.ConfigInfo",
String.format("%s-%s-%s", appId, cluster, m_namespace));
int maxRetries = 2;
Throwable exception = null;

Expand All @@ -128,7 +143,7 @@ private ApolloConfig loadApolloConfig() {

for (ServiceDTO configService : randomConfigServices) {
String url =
assembleUrl(configService.getHomepageUrl(), appId, cluster, m_namespace,
assembleQueryConfigUrl(configService.getHomepageUrl(), appId, cluster, m_namespace,
m_configCache.get());

logger.debug("Loading config from {}", url);
Expand Down Expand Up @@ -172,18 +187,19 @@ private ApolloConfig loadApolloConfig() {
throw new RuntimeException(message, exception);
}

private String assembleUrl(String uri, String appId, String cluster, String namespace,
ApolloConfig previousConfig) {
private String assembleQueryConfigUrl(String uri, String appId, String cluster, String namespace,
ApolloConfig previousConfig) {
Escaper escaper = UrlEscapers.urlPathSegmentEscaper();
String path = "configs/%s/%s";
List<String> params = Lists.newArrayList(appId, cluster);
List<String> params = Lists.newArrayList(escaper.escape(appId), escaper.escape(cluster));

if (!Strings.isNullOrEmpty(namespace)) {
path = path + "/%s";
params.add(namespace);
params.add(escaper.escape(namespace));
}
if (previousConfig != null) {
path = path + "?releaseId=%s";
params.add(String.valueOf(previousConfig.getReleaseId()));
params.add(escaper.escape(String.valueOf(previousConfig.getReleaseId())));
}

String pathExpanded = String.format(path, params.toArray());
Expand All @@ -193,6 +209,106 @@ private String assembleUrl(String uri, String appId, String cluster, String name
return uri + pathExpanded;
}

private void scheduleLongPollingRefresh() {
final String appId = m_configUtil.getAppId();
final String cluster = m_configUtil.getCluster();
final ExecutorService longPollingService =
Executors.newFixedThreadPool(2,
ApolloThreadFactory.create("RemoteConfigRepository-LongPolling", true));
longPollingService.submit(new Runnable() {
@Override
public void run() {
doLongPollingRefresh(appId, cluster, longPollingService);
}
});
}

private void doLongPollingRefresh(String appId, String cluster,
ExecutorService longPollingService) {
final Random random = new Random();
ServiceDTO lastServiceDto = null;
Transaction transaction = null;
while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {
try {
if (lastServiceDto == null) {
List<ServiceDTO> configServices = getConfigServices();
lastServiceDto = configServices.get(random.nextInt(configServices.size()));
}

String url =
assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster,
m_namespace, m_configCache.get());

logger.debug("Long polling from {}", url);
HttpRequest request = new HttpRequest(url);
//no timeout for read
request.setReadTimeout(0);

transaction = Cat.newTransaction("Apollo.ConfigService", "pollNotification");
transaction.addData("Url", url);

HttpResponse<ApolloConfigNotification> response =
m_httpUtil.doGet(request, ApolloConfigNotification.class);

logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url);
if (response.getStatusCode() == 200) {
longPollingService.submit(new Runnable() {
@Override
public void run() {
trySync();
}
});
}
transaction.addData("StatusCode", response.getStatusCode());
transaction.setStatus(Message.SUCCESS);
} catch (Throwable ex) {
logger.warn("Long polling failed for appId: {}, cluster: {}, namespace: {}, reason: {}",
appId, cluster, m_namespace, ex);
lastServiceDto = null;
Cat.logError(ex);
if (transaction != null) {
transaction.setStatus(ex);
}
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException ie) {
//ignore
}
} finally {
if (transaction != null) {
transaction.complete();
}
}
}
}

private String assembleLongPollRefreshUrl(String uri, String appId, String cluster,
String namespace,
ApolloConfig previousConfig) {
Escaper escaper = UrlEscapers.urlPathSegmentEscaper();
Map<String, String> queryParams = Maps.newHashMap();
queryParams.put("appId", escaper.escape(appId));
queryParams.put("cluster", escaper.escape(cluster));

if (!Strings.isNullOrEmpty(namespace)) {
queryParams.put("namespace", escaper.escape(namespace));
}
if (previousConfig != null) {
queryParams.put("releaseId", escaper.escape(previousConfig.getReleaseId()));
}

String params = Joiner.on("&").withKeyValueSeparator("=").join(queryParams);
if (!uri.endsWith("/")) {
uri += "/";
}

return uri + "notifications?" + params;
}

void stopLongPollingRefresh() {
this.m_longPollingStopped.compareAndSet(false, true);
}

private List<ServiceDTO> getConfigServices() {
List<ServiceDTO> services = m_serviceLocator.getConfigServices();
if (services.size() == 0) {
Expand Down
Expand Up @@ -74,5 +74,7 @@ public String apply(ConfigChange input) {
m_configProperties = newConfigProperties;

this.fireConfigChange(new ConfigChangeEvent(m_namespace, changeMap));

Cat.logEvent("Apollo.Client.ConfigChanges", m_namespace);
}
}
Expand Up @@ -40,7 +40,8 @@ public ConfigChange getChange(String key) {
}

/**
* Get the changes. Please note that the returned Map is immutable.
* Get the changes as <Key, Change> map.
* Please note that the returned Map is immutable.
* @return changes
*/
public Map<String, ConfigChange> getChanges() {
Expand Down
Expand Up @@ -27,20 +27,23 @@ public class HttpUtil {
private ConfigUtil m_configUtil;
private Gson gson;
private String basicAuth;


/**
* Constructor.
*/
public HttpUtil() {
gson = new Gson();
try {
basicAuth = "Basic " + BaseEncoding.base64().encode("user:".getBytes("UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
basicAuth = "Basic " + BaseEncoding.base64().encode("user:".getBytes("UTF-8"));
} catch (UnsupportedEncodingException ex) {
ex.printStackTrace();
}
}

/**
* Do get operation for the http request.
*
* @param httpRequest the request
* @param httpRequest the request
* @param responseType the response type
* @return the response
* @throws RuntimeException if any error happened or response code is neither 200 nor 304
Expand All @@ -59,7 +62,7 @@ public T apply(String input) {
/**
* Do get operation for the http request.
*
* @param httpRequest the request
* @param httpRequest the request
* @param responseType the response type
* @return the response
* @throws RuntimeException if any error happened or response code is neither 200 nor 304
Expand All @@ -76,14 +79,14 @@ public T apply(String input) {
}

private <T> HttpResponse<T> doGetWithSerializeFunction(HttpRequest httpRequest,
Function<String, T> serializeFunction) {
Function<String, T> serializeFunction) {
InputStream is = null;
try {
HttpURLConnection conn = (HttpURLConnection) new URL(httpRequest.getUrl()).openConnection();

conn.setRequestMethod("GET");
conn.setRequestProperty ("Authorization", basicAuth);
conn.setRequestProperty("Authorization", basicAuth);

int connectTimeout = httpRequest.getConnectTimeout();
if (connectTimeout < 0) {
connectTimeout = m_configUtil.getConnectTimeout();
Expand Down

0 comments on commit 2781e18

Please sign in to comment.