Skip to content

Commit

Permalink
Refactor logging and add meta service refresh
Browse files Browse the repository at this point in the history
  • Loading branch information
nobodyiam committed Apr 20, 2016
1 parent 24db89d commit 055d051
Show file tree
Hide file tree
Showing 13 changed files with 99 additions and 16 deletions.
Expand Up @@ -31,8 +31,8 @@ public List<Component> defineComponents() {
all.add(A(DefaultConfigRegistry.class));
all.add(A(DefaultConfigFactoryManager.class));
all.add(A(ConfigUtil.class));
all.add(A(ConfigServiceLocator.class));
all.add(A(HttpUtil.class));
all.add(A(ConfigServiceLocator.class));

return all;
}
Expand Down
Expand Up @@ -9,6 +9,7 @@
import com.ctrip.apollo.enums.PropertyChangeType;
import com.ctrip.apollo.model.ConfigChange;
import com.ctrip.apollo.model.ConfigChangeEvent;
import com.ctrip.apollo.util.ExceptionUtil;
import com.dianping.cat.Cat;

import org.slf4j.Logger;
Expand Down Expand Up @@ -38,7 +39,8 @@ protected void fireConfigChange(ConfigChangeEvent changeEvent) {
listener.onChange(changeEvent);
} catch (Throwable ex) {
Cat.logError(ex);
logger.error("Failed to invoke config change listener {}", listener.getClass(), ex);
logger.error("Failed to invoke config change listener {}", listener.getClass(), ExceptionUtil
.getDetailMessage(ex));
}
}
}
Expand Down
Expand Up @@ -2,6 +2,7 @@

import com.google.common.collect.Lists;

import com.ctrip.apollo.util.ExceptionUtil;
import com.dianping.cat.Cat;

import org.slf4j.Logger;
Expand All @@ -22,7 +23,9 @@ protected void trySync() {
sync();
} catch (Throwable ex) {
Cat.logError(ex);
logger.warn("Sync config failed with repository {}, reason: {}", this.getClass(), ex);
logger
.warn("Sync config failed with repository {}, reason: {}", this.getClass(), ExceptionUtil
.getDetailMessage(ex));
}
}

Expand All @@ -46,7 +49,9 @@ protected void fireRepositoryChange(String namespace, Properties newProperties)
listener.onRepositoryChange(namespace, newProperties);
} catch (Throwable ex) {
Cat.logError(ex);
logger.error("Failed to invoke repository change listener {}", listener.getClass(), ex);
logger.error("Failed to invoke repository change listener {}", listener.getClass(),
ExceptionUtil
.getDetailMessage(ex));
}
}
}
Expand Down
Expand Up @@ -4,6 +4,7 @@
import com.google.gson.reflect.TypeToken;

import com.ctrip.apollo.core.dto.ServiceDTO;
import com.ctrip.apollo.core.utils.ApolloThreadFactory;
import com.ctrip.apollo.util.ConfigUtil;
import com.ctrip.apollo.util.http.HttpRequest;
import com.ctrip.apollo.util.http.HttpResponse;
Expand All @@ -12,22 +13,30 @@
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;

import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.unidal.lookup.annotation.Inject;
import org.unidal.lookup.annotation.Named;

import java.lang.reflect.Type;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

@Named(type = ConfigServiceLocator.class)
public class ConfigServiceLocator {
public class ConfigServiceLocator implements Initializable{
private static final Logger logger = LoggerFactory.getLogger(ConfigServiceLocator.class);
@Inject
private HttpUtil m_httpUtil;
@Inject
private ConfigUtil m_configUtil;
private AtomicReference<List<ServiceDTO>> m_configServices;
private Type m_responseType;
private ScheduledExecutorService m_executorService;

/**
* Create a config service locator.
Expand All @@ -37,6 +46,14 @@ public ConfigServiceLocator() {
m_configServices = new AtomicReference<>(initial);
m_responseType = new TypeToken<List<ServiceDTO>>() {
}.getType();
this.m_executorService = Executors.newScheduledThreadPool(1,
ApolloThreadFactory.create("ConfigServiceLocator", true));
}

@Override
public void initialize() throws InitializationException {
this.tryUpdateConfigServices();
this.schedulePeriodicRefresh();
}

/**
Expand All @@ -52,8 +69,31 @@ public List<ServiceDTO> getConfigServices() {
return m_configServices.get();
}

private void tryUpdateConfigServices() {
try {
updateConfigServices();
} catch (Throwable ex) {
//ignore
}
}

private void schedulePeriodicRefresh() {
this.m_executorService.scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
logger.debug("refresh config services");
Transaction transaction = Cat.newTransaction("Apollo.MetaService", "periodicRefresh");
tryUpdateConfigServices();
transaction.setStatus(Message.SUCCESS);
transaction.complete();
}
}, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(),
m_configUtil.getRefreshTimeUnit());
}

//TODO periodically update config services
private void updateConfigServices() {
private synchronized void updateConfigServices() {
String domainName = m_configUtil.getMetaServerDomainName();
String url = domainName + "/services/config";

Expand Down
Expand Up @@ -6,6 +6,7 @@
import com.ctrip.apollo.enums.PropertyChangeType;
import com.ctrip.apollo.model.ConfigChange;
import com.ctrip.apollo.model.ConfigChangeEvent;
import com.ctrip.apollo.util.ExceptionUtil;
import com.dianping.cat.Cat;

import org.slf4j.Logger;
Expand Down Expand Up @@ -50,7 +51,7 @@ private void initialize() {
} catch (Throwable ex) {
Cat.logError(ex);
logger.warn("Init Apollo Local Config failed - namespace: {}, reason: {}.",
m_namespace, ex);
m_namespace, ExceptionUtil.getDetailMessage(ex));
}
}

Expand Down
Expand Up @@ -3,6 +3,7 @@
import com.google.common.base.Preconditions;

import com.ctrip.apollo.util.ConfigUtil;
import com.ctrip.apollo.util.ExceptionUtil;
import com.dianping.cat.Cat;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
Expand Down Expand Up @@ -122,7 +123,7 @@ private void trySyncFromFallback() {
Cat.logError(ex);
logger
.warn("Sync config from fallback repository {} failed, reason: {}", m_fallback.getClass(),
ex);
ExceptionUtil.getDetailMessage(ex));
}
}

Expand Down Expand Up @@ -186,7 +187,8 @@ void persistLocalCacheFile(File baseDir, String namespace) {
} catch (IOException ex) {
Cat.logError(ex);
transaction.setStatus(ex);
logger.warn("Persist local cache file {} failed, reason: {}.", file.getAbsolutePath(), ex);
logger.warn("Persist local cache file {} failed, reason: {}.", file.getAbsolutePath(),
ExceptionUtil.getDetailMessage(ex));
} finally {
if (out != null) {
try {
Expand Down
Expand Up @@ -12,6 +12,7 @@
import com.ctrip.apollo.core.dto.ServiceDTO;
import com.ctrip.apollo.core.utils.ApolloThreadFactory;
import com.ctrip.apollo.util.ConfigUtil;
import com.ctrip.apollo.util.ExceptionUtil;
import com.ctrip.apollo.util.http.HttpRequest;
import com.ctrip.apollo.util.http.HttpResponse;
import com.ctrip.apollo.util.http.HttpUtil;
Expand Down Expand Up @@ -96,6 +97,7 @@ private void schedulePeriodicRefresh() {
new Runnable() {
@Override
public void run() {
logger.debug("refresh config for namespace: {}", m_namespace);
Transaction transaction = Cat.newTransaction("Apollo.ConfigService", "periodicRefresh");
trySync();
transaction.setStatus(Message.SUCCESS);
Expand Down Expand Up @@ -262,8 +264,8 @@ public void run() {
transaction.addData("StatusCode", response.getStatusCode());
transaction.setStatus(Message.SUCCESS);
} catch (Throwable ex) {
logger.warn("Long polling failed for appId: {}, cluster: {}, namespace: {}, reason: {}",
appId, cluster, m_namespace, ex);
logger.warn("Long polling failed for appId: {}, cluster: {}, namespace: {}",
appId, cluster, m_namespace, ExceptionUtil.getDetailMessage(ex));
lastServiceDto = null;
Cat.logError(ex);
if (transaction != null) {
Expand Down
Expand Up @@ -5,6 +5,7 @@

import com.ctrip.apollo.model.ConfigChange;
import com.ctrip.apollo.model.ConfigChangeEvent;
import com.ctrip.apollo.util.ExceptionUtil;
import com.dianping.cat.Cat;

import org.slf4j.Logger;
Expand Down Expand Up @@ -41,7 +42,8 @@ private void initialize() {
m_configRepository.addChangeListener(this);
} catch (Throwable ex) {
Cat.logError(ex);
logger.warn("Init Apollo Simple Config failed - namespace: {}, reason: {}", m_namespace, ex);
logger.warn("Init Apollo Simple Config failed - namespace: {}, reason: {}", m_namespace,
ExceptionUtil.getDetailMessage(ex));
}
}

Expand Down
Expand Up @@ -5,6 +5,7 @@
import com.ctrip.apollo.internals.DefaultConfig;
import com.ctrip.apollo.internals.LocalFileConfigRepository;
import com.ctrip.apollo.internals.RemoteConfigRepository;
import com.ctrip.apollo.util.ExceptionUtil;
import com.dianping.cat.Cat;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
Expand Down Expand Up @@ -48,7 +49,7 @@ private void checkLocalConfigCacheDir(File baseDir) {
transaction.setStatus(ex);
logger.warn(
"Unable to create local config cache directory {}, reason: {}. Will not able to cache config file.",
baseDir, ex);
baseDir, ExceptionUtil.getDetailMessage(ex));
} finally {
transaction.complete();
}
Expand Down
@@ -0,0 +1,16 @@
package com.ctrip.apollo.util;

/**
* @author Jason Song(song_s@ctrip.com)
*/
public class ExceptionUtil {
public static String getDetailMessage(Throwable ex) {
if (ex == null) {
return "";
}
if (ex.getCause() != null) {
return String.format("%s [Cause: %s]", ex.getMessage(), getDetailMessage(ex.getCause()));
}
return ex.getMessage();
}
}
Expand Up @@ -228,12 +228,13 @@ public void testLongPollRefresh() throws Exception {
final String someValue = "someValue";
final String anotherValue = "anotherValue";

long pollTimeoutInMS = 50;
Map<String, String> configurations = Maps.newHashMap();
configurations.put(someKey, someValue);
ApolloConfig apolloConfig = assembleApolloConfig(configurations);
ContextHandler configHandler = mockConfigServerHandler(HttpServletResponse.SC_OK, apolloConfig);
ContextHandler pollHandler =
mockPollNotificationHandler(50, HttpServletResponse.SC_OK,
mockPollNotificationHandler(pollTimeoutInMS, HttpServletResponse.SC_OK,
new ApolloConfigNotification(apolloConfig.getAppId(), apolloConfig.getCluster(),
apolloConfig.getNamespace()), false);

Expand All @@ -244,7 +245,7 @@ public void testLongPollRefresh() throws Exception {

apolloConfig.getConfigurations().put(someKey, anotherValue);

TimeUnit.MILLISECONDS.sleep(60);
TimeUnit.MILLISECONDS.sleep(pollTimeoutInMS * 3);

assertEquals(anotherValue, config.getProperty(someKey, null));

Expand Down
Expand Up @@ -12,6 +12,7 @@
import com.ctrip.apollo.util.http.HttpResponse;
import com.ctrip.apollo.util.http.HttpUtil;

import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -181,6 +182,11 @@ public List<ServiceDTO> getConfigServices() {
when(serviceDTO.getHomepageUrl()).thenReturn(someServerUrl);
return Lists.newArrayList(serviceDTO);
}

@Override
public void initialize() throws InitializationException {
//do nothing
}
}

public static class MockHttpUtil extends HttpUtil {
Expand Down
Expand Up @@ -30,9 +30,11 @@
@RequestMapping("/notifications")
public class NotificationController {
private static final Logger logger = LoggerFactory.getLogger(NotificationController.class);
private final static long TIMEOUT = 60 * 60 * 1000;//60 MINUTES
private final static long TIMEOUT = 120 * 60 * 1000;//120 MINUTES
private final Multimap<String, DeferredResult<ApolloConfigNotification>> deferredResults =
Multimaps.synchronizedSetMultimap(HashMultimap.create());
private final Multimap<DeferredResult<ApolloConfigNotification>, String> deferredResultReversed =
Multimaps.synchronizedSetMultimap(HashMultimap.create());

{
startRandomChange();
Expand All @@ -43,12 +45,15 @@ public DeferredResult<ApolloConfigNotification> pollNotification(
@RequestParam(value = "appId") String appId,
@RequestParam(value = "cluster") String cluster,
@RequestParam(value = "namespace", defaultValue = ConfigConsts.NAMESPACE_APPLICATION) String namespace,
@RequestParam(value = "datacenter", required = false) String datacenter,
@RequestParam(value = "releaseId", defaultValue = "-1") String clientSideReleaseId,
HttpServletResponse response) {
DeferredResult<ApolloConfigNotification> deferredResult =
new DeferredResult<>(TIMEOUT);
String key = assembleKey(appId, cluster, namespace);
this.deferredResults.put(key, deferredResult);
//to record all the keys related to deferredResult
this.deferredResultReversed.put(deferredResult, key);

deferredResult.onCompletion(() -> {
logger.info("deferred result for {} {} {} completed", appId, cluster, namespace);
Expand Down

0 comments on commit 055d051

Please sign in to comment.