Skip to content

Commit

Permalink
Add notification id for long poll
Browse files Browse the repository at this point in the history
  • Loading branch information
nobodyiam committed Jun 1, 2016
1 parent cdcd35a commit 52e88f6
Show file tree
Hide file tree
Showing 17 changed files with 289 additions and 74 deletions.
@@ -1,5 +1,7 @@
package com.ctrip.framework.apollo.biz.entity;

import com.google.common.base.MoreObjects;

import java.util.Date;

import javax.persistence.Column;
Expand Down Expand Up @@ -55,4 +57,14 @@ public String getMessage() {
public void setMessage(String message) {
this.message = message;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.omitNullValues()
.add("id", id)
.add("message", message)
.add("dataChangeLastModifiedTime", dataChangeLastModifiedTime)
.toString();
}
}

This file was deleted.

@@ -0,0 +1,10 @@
package com.ctrip.framework.apollo.biz.message;

import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;

/**
* @author Jason Song(song_s@ctrip.com)
*/
public interface ReleaseMessageListener {
void handleMessage(ReleaseMessage message, String channel);
}
Expand Up @@ -33,7 +33,7 @@ public class ReleaseMessageScanner implements InitializingBean {
@Autowired
private ReleaseMessageRepository releaseMessageRepository;
private int databaseScanInterval;
private List<MessageListener> listeners;
private List<ReleaseMessageListener> listeners;
private ScheduledExecutorService executorService;
private long maxIdScanned;

Expand Down Expand Up @@ -66,7 +66,7 @@ public void afterPropertiesSet() throws Exception {
* add message listeners for release message
* @param listener
*/
public void addMessageListener(MessageListener listener) {
public void addMessageListener(ReleaseMessageListener listener) {
if (!listeners.contains(listener)) {
listeners.add(listener);
}
Expand Down Expand Up @@ -115,9 +115,9 @@ private long loadLargestMessageId() {
*/
private void fireMessageScanned(List<ReleaseMessage> messages) {
for (ReleaseMessage message : messages) {
for (MessageListener listener : listeners) {
for (ReleaseMessageListener listener : listeners) {
try {
listener.handleMessage(message.getMessage(), Topics.APOLLO_RELEASE_TOPIC);
listener.handleMessage(message, Topics.APOLLO_RELEASE_TOPIC);
} catch (Throwable ex) {
Cat.logError(ex);
logger.error("Failed to invoke message listener {}", listener.getClass(), ex);
Expand Down
Expand Up @@ -4,6 +4,7 @@

import org.springframework.data.repository.PagingAndSortingRepository;

import java.util.Collection;
import java.util.List;

/**
Expand All @@ -13,4 +14,6 @@ public interface ReleaseMessageRepository extends PagingAndSortingRepository<Rel
List<ReleaseMessage> findFirst500ByIdGreaterThanOrderByIdAsc(Long id);

ReleaseMessage findTopByOrderByIdDesc();

ReleaseMessage findTopByMessageInOrderByIdDesc(Collection<String> messages);
}
@@ -0,0 +1,22 @@
package com.ctrip.framework.apollo.biz.service;

import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
import com.ctrip.framework.apollo.biz.repository.ReleaseMessageRepository;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Collection;

/**
* @author Jason Song(song_s@ctrip.com)
*/
@Service
public class ReleaseMessageService {
@Autowired
private ReleaseMessageRepository releaseMessageRepository;

public ReleaseMessage findLatestReleaseMessageForMessages(Collection<String> messages) {
return releaseMessageRepository.findTopByMessageInOrderByIdDesc(messages);
}
}
Expand Up @@ -44,8 +44,8 @@ public void setUp() throws Exception {

@Test
public void testScanMessageAndNotifyMessageListener() throws Exception {
SettableFuture<String> someListenerFuture = SettableFuture.create();
MessageListener someListener = (message, channel) -> someListenerFuture.set(message);
SettableFuture<ReleaseMessage> someListenerFuture = SettableFuture.create();
ReleaseMessageListener someListener = (message, channel) -> someListenerFuture.set(message);
releaseMessageScanner.addMessageListener(someListener);

String someMessage = "someMessage";
Expand All @@ -55,13 +55,14 @@ public void testScanMessageAndNotifyMessageListener() throws Exception {
when(releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(0L)).thenReturn(
Lists.newArrayList(someReleaseMessage));

String someListenerMessage =
ReleaseMessage someListenerMessage =
someListenerFuture.get(5000, TimeUnit.MILLISECONDS);

assertEquals(someMessage, someListenerMessage);
assertEquals(someMessage, someListenerMessage.getMessage());
assertEquals(someId, someListenerMessage.getId());

SettableFuture<String> anotherListenerFuture = SettableFuture.create();
MessageListener anotherListener = (message, channel) -> anotherListenerFuture.set(message);
SettableFuture<ReleaseMessage> anotherListenerFuture = SettableFuture.create();
ReleaseMessageListener anotherListener = (message, channel) -> anotherListenerFuture.set(message);
releaseMessageScanner.addMessageListener(anotherListener);

String anotherMessage = "anotherMessage";
Expand All @@ -71,10 +72,11 @@ public void testScanMessageAndNotifyMessageListener() throws Exception {
when(releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(someId)).thenReturn(
Lists.newArrayList(anotherReleaseMessage));

String anotherListenerMessage =
ReleaseMessage anotherListenerMessage =
anotherListenerFuture.get(5000, TimeUnit.MILLISECONDS);

assertEquals(anotherMessage, anotherListenerMessage);
assertEquals(anotherMessage, anotherListenerMessage.getMessage());
assertEquals(anotherId, anotherListenerMessage.getId());

}

Expand Down
Expand Up @@ -59,6 +59,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
private final AtomicBoolean m_longPollingStopped;
private SchedulePolicy m_longPollSchedulePolicy;
private AtomicReference<ServiceDTO> m_longPollServiceDto;
private AtomicReference<ApolloConfigNotification> m_longPollResult;

/**
* Constructor.
Expand All @@ -82,8 +83,9 @@ public RemoteConfigRepository(String namespace) {
m_executorService = Executors.newScheduledThreadPool(1,
ApolloThreadFactory.create("RemoteConfigRepository", true));
m_longPollingService = Executors.newFixedThreadPool(2,
ApolloThreadFactory.create("RemoteConfigRepository-LongPolling", true));
ApolloThreadFactory.create("RemoteConfigRepository-LongPolling", true));
m_longPollServiceDto = new AtomicReference<>();
m_longPollResult = new AtomicReference<>();
this.trySync();
this.schedulePeriodicRefresh();
this.scheduleLongPollingRefresh();
Expand Down Expand Up @@ -270,7 +272,7 @@ private void doLongPollingRefresh(String appId, String cluster, String dataCente

String url =
assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster,
m_namespace, dataCenter);
m_namespace, dataCenter, m_longPollResult.get());

logger.debug("Long polling from {}", url);
HttpRequest request = new HttpRequest(url);
Expand All @@ -286,6 +288,10 @@ private void doLongPollingRefresh(String appId, String cluster, String dataCente
logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url);
if (response.getStatusCode() == 200) {
m_longPollServiceDto.set(lastServiceDto);
if (response.getBody() != null) {
m_longPollResult.set(response.getBody());
transaction.addData("Result", response.getBody().toString());
}
longPollingService.submit(new Runnable() {
@Override
public void run() {
Expand Down Expand Up @@ -320,7 +326,8 @@ public void run() {
}

private String assembleLongPollRefreshUrl(String uri, String appId, String cluster,
String namespace, String dataCenter) {
String namespace, String dataCenter,
ApolloConfigNotification previousResult) {
Escaper escaper = UrlEscapers.urlPathSegmentEscaper();
Map<String, String> queryParams = Maps.newHashMap();
queryParams.put("appId", escaper.escape(appId));
Expand All @@ -337,6 +344,11 @@ private String assembleLongPollRefreshUrl(String uri, String appId, String clust
queryParams.put("ip", escaper.escape(localIp));
}

if (previousResult != null) {
//number doesn't need encode
queryParams.put("notificationId", String.valueOf(previousResult.getNotificationId()));
}

String params = MAP_JOINER.join(queryParams);
if (!uri.endsWith("/")) {
uri += "/";
Expand Down
Expand Up @@ -234,6 +234,7 @@ public void testLongPollRefresh() throws Exception {
final String someKey = "someKey";
final String someValue = "someValue";
final String anotherValue = "anotherValue";
long someNotificationId = 1;

long pollTimeoutInMS = 50;
Map<String, String> configurations = Maps.newHashMap();
Expand All @@ -242,7 +243,7 @@ public void testLongPollRefresh() throws Exception {
ContextHandler configHandler = mockConfigServerHandler(HttpServletResponse.SC_OK, apolloConfig);
ContextHandler pollHandler =
mockPollNotificationHandler(pollTimeoutInMS, HttpServletResponse.SC_OK,
new ApolloConfigNotification(apolloConfig.getNamespaceName()), false);
new ApolloConfigNotification(apolloConfig.getNamespaceName(), someNotificationId), false);

startServerWithHandlers(configHandler, pollHandler);

Expand Down
Expand Up @@ -10,9 +10,11 @@
import com.google.common.collect.Sets;

import com.ctrip.framework.apollo.biz.entity.AppNamespace;
import com.ctrip.framework.apollo.biz.message.MessageListener;
import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
import com.ctrip.framework.apollo.biz.message.ReleaseMessageListener;
import com.ctrip.framework.apollo.biz.message.Topics;
import com.ctrip.framework.apollo.biz.service.AppNamespaceService;
import com.ctrip.framework.apollo.biz.service.ReleaseMessageService;
import com.ctrip.framework.apollo.biz.utils.EntityManagerUtil;
import com.ctrip.framework.apollo.core.ConfigConsts;
import com.ctrip.framework.apollo.core.dto.ApolloConfigNotification;
Expand All @@ -38,7 +40,7 @@
*/
@RestController
@RequestMapping("/notifications")
public class NotificationController implements MessageListener {
public class NotificationController implements ReleaseMessageListener {
private static final Logger logger = LoggerFactory.getLogger(NotificationController.class);
private static final long TIMEOUT = 30 * 1000;//30 seconds
private final Multimap<String, DeferredResult<ResponseEntity<ApolloConfigNotification>>>
Expand All @@ -52,6 +54,9 @@ public class NotificationController implements MessageListener {
@Autowired
private AppNamespaceService appNamespaceService;

@Autowired
private ReleaseMessageService releaseMessageService;

@Autowired
private EntityManagerUtil entityManagerUtil;

Expand All @@ -61,6 +66,7 @@ public DeferredResult<ResponseEntity<ApolloConfigNotification>> pollNotification
@RequestParam(value = "cluster") String cluster,
@RequestParam(value = "namespace", defaultValue = ConfigConsts.NAMESPACE_DEFAULT) String namespace,
@RequestParam(value = "dataCenter", required = false) String dataCenter,
@RequestParam(value = "notificationId", defaultValue = "-1") long notificationId,
@RequestParam(value = "ip", required = false) String clientIp) {
Set<String> watchedKeys = assembleWatchKeys(appId, cluster, namespace, dataCenter);

Expand All @@ -72,24 +78,42 @@ public DeferredResult<ResponseEntity<ApolloConfigNotification>> pollNotification
DeferredResult<ResponseEntity<ApolloConfigNotification>> deferredResult =
new DeferredResult<>(TIMEOUT, NOT_MODIFIED_RESPONSE);

//register all keys
for (String key : watchedKeys) {
this.deferredResults.put(key, deferredResult);
}
//check whether client is out-dated
ReleaseMessage latest = releaseMessageService.findLatestReleaseMessageForMessages(watchedKeys);

deferredResult.onTimeout(() -> logWatchedKeysToCat(watchedKeys, "Apollo.LongPoll.TimeOutKeys"));
/**
* Manually close the entity manager.
* Since for async request, Spring won't do so until the request is finished,
* which is unacceptable since we are doing long polling - means the db connection would be hold
* for a very long time
*/
entityManagerUtil.closeEntityManager();

deferredResult.onCompletion(() -> {
//unregister all keys
if (latest != null && latest.getId() != notificationId) {
deferredResult.setResult(new ResponseEntity<>(
new ApolloConfigNotification(namespace, latest.getId()), HttpStatus.OK));
} else {
//register all keys
for (String key : watchedKeys) {
deferredResults.remove(key, deferredResult);
this.deferredResults.put(key, deferredResult);
}
logWatchedKeysToCat(watchedKeys, "Apollo.LongPoll.CompletedKeys");
});

logWatchedKeysToCat(watchedKeys, "Apollo.LongPoll.RegisteredKeys");
logger.info("Listening {} from appId: {}, cluster: {}, namespace: {}, datacenter: {}",
watchedKeys, appId, cluster, namespace, dataCenter);
deferredResult
.onTimeout(() -> logWatchedKeysToCat(watchedKeys, "Apollo.LongPoll.TimeOutKeys"));

deferredResult.onCompletion(() -> {
//unregister all keys
for (String key : watchedKeys) {
deferredResults.remove(key, deferredResult);
}
logWatchedKeysToCat(watchedKeys, "Apollo.LongPoll.CompletedKeys");
});

logWatchedKeysToCat(watchedKeys, "Apollo.LongPoll.RegisteredKeys");
logger.info("Listening {} from appId: {}, cluster: {}, namespace: {}, datacenter: {}",
watchedKeys, appId, cluster, namespace, dataCenter);
}

return deferredResult;
}

Expand All @@ -101,13 +125,6 @@ private Set<String> findPublicConfigWatchKey(String applicationId, String cluste
String namespace,
String dataCenter) {
AppNamespace appNamespace = appNamespaceService.findByNamespaceName(namespace);
/**
* Manually close the entity manager.
* Since for async request, Spring won't do so until the request is finished,
* which is unacceptable since we are doing long polling - means the db connection would be hold
* for a very long time
*/
entityManagerUtil.closeEntityManager();

//check whether the namespace's appId equals to current one
if (Objects.isNull(appNamespace) || Objects.equals(applicationId, appNamespace.getAppId())) {
Expand Down Expand Up @@ -140,27 +157,29 @@ private Set<String> assembleWatchKeys(String appId, String clusterName, String n
}

@Override
public void handleMessage(String message, String channel) {
public void handleMessage(ReleaseMessage message, String channel) {
logger.info("message received - channel: {}, message: {}", channel, message);
Cat.logEvent("Apollo.LongPoll.Messages", message);
if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(message)) {

String content = message.getMessage();
Cat.logEvent("Apollo.LongPoll.Messages", content);
if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(content)) {
return;
}
List<String> keys = STRING_SPLITTER.splitToList(message);
//message should be appId|cluster|namespace
List<String> keys = STRING_SPLITTER.splitToList(content);
//message should be appId+cluster+namespace
if (keys.size() != 3) {
logger.error("message format invalid - {}", message);
logger.error("message format invalid - {}", content);
return;
}

ResponseEntity<ApolloConfigNotification> notification =
new ResponseEntity<>(
new ApolloConfigNotification(keys.get(2)), HttpStatus.OK);
new ApolloConfigNotification(keys.get(2), message.getId()), HttpStatus.OK);

//create a new list to avoid ConcurrentModificationException
List<DeferredResult<ResponseEntity<ApolloConfigNotification>>> results =
Lists.newArrayList(deferredResults.get(message));
logger.info("Notify {} clients for key {}", results.size(), message);
Lists.newArrayList(deferredResults.get(content));
logger.info("Notify {} clients for key {}", results.size(), content);

for (DeferredResult<ResponseEntity<ApolloConfigNotification>> result : results) {
result.setResult(notification);
Expand Down

0 comments on commit 52e88f6

Please sign in to comment.