Skip to content

Commit

Permalink
fix the issue that release messages might be missed in certain scenarios
Browse files Browse the repository at this point in the history
  • Loading branch information
nobodyiam committed Jul 11, 2021
1 parent 72a0498 commit dae4af2
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Expand Up @@ -56,6 +56,7 @@ Apollo 1.9.0
* [set default session store-type](https://github.com/ctripcorp/apollo/pull/3812)
* [speed up the stale issue mark and close phase](https://github.com/ctripcorp/apollo/pull/3808)
* [feature: add the delegating password encoder for apollo-portal simple auth](https://github.com/ctripcorp/apollo/pull/3804)
* [fix the issue that release messages might be missed in certain scenarios #3819](https://github.com/ctripcorp/apollo/pull/3819)
------------------
All issues and pull requests are [here](https://github.com/ctripcorp/apollo/milestone/6?closed=1)

Expand Up @@ -16,7 +16,12 @@
*/
package com.ctrip.framework.apollo.biz.message;

import com.google.common.collect.Maps;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -40,6 +45,7 @@
*/
public class ReleaseMessageScanner implements InitializingBean {
private static final Logger logger = LoggerFactory.getLogger(ReleaseMessageScanner.class);
private static final int missingReleaseMessageMaxAge = 10; // hardcoded to 10, could be configured via BizConfig if necessary
@Autowired
private BizConfig bizConfig;
@Autowired
Expand All @@ -48,6 +54,7 @@ public class ReleaseMessageScanner implements InitializingBean {
private List<ReleaseMessageListener> listeners;
private ScheduledExecutorService executorService;
private long maxIdScanned;
private Map<Long, Integer> missingReleaseMessages; // missing release message id => age counter

public ReleaseMessageScanner() {
listeners = Lists.newCopyOnWriteArrayList();
Expand All @@ -59,10 +66,12 @@ public ReleaseMessageScanner() {
public void afterPropertiesSet() throws Exception {
databaseScanInterval = bizConfig.releaseMessageScanIntervalInMilli();
maxIdScanned = loadLargestMessageId();
missingReleaseMessages = Maps.newHashMap();
executorService.scheduleWithFixedDelay(() -> {
Transaction transaction = Tracer.newTransaction("Apollo.ReleaseMessageScanner", "scanMessage");
try {
scanMessages();
scanMissingMessages();
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
transaction.setStatus(ex);
Expand Down Expand Up @@ -108,10 +117,51 @@ private boolean scanAndSendMessages() {
}
fireMessageScanned(releaseMessages);
int messageScanned = releaseMessages.size();
maxIdScanned = releaseMessages.get(messageScanned - 1).getId();
long newMaxIdScanned = releaseMessages.get(messageScanned - 1).getId();
// check id gaps, possible reasons are release message not committed yet or already rolled back
if (newMaxIdScanned - maxIdScanned > messageScanned) {
recordMissingReleaseMessageIds(releaseMessages, maxIdScanned);
}
maxIdScanned = newMaxIdScanned;
return messageScanned == 500;
}

private void scanMissingMessages() {
Set<Long> missingReleaseMessageIds = missingReleaseMessages.keySet();
Iterable<ReleaseMessage> releaseMessages = releaseMessageRepository
.findAllById(missingReleaseMessageIds);
fireMessageScanned(releaseMessages);
releaseMessages.forEach(releaseMessage -> {
missingReleaseMessageIds.remove(releaseMessage.getId());
});
growAndCleanMissingMessages();
}

private void growAndCleanMissingMessages() {
Iterator<Entry<Long, Integer>> iterator = missingReleaseMessages.entrySet()
.iterator();
while (iterator.hasNext()) {
Entry<Long, Integer> entry = iterator.next();
if (entry.getValue() > missingReleaseMessageMaxAge) {
iterator.remove();
} else {
entry.setValue(entry.getValue() + 1);
}
}
}

private void recordMissingReleaseMessageIds(List<ReleaseMessage> messages, long startId) {
for (ReleaseMessage message : messages) {
long currentId = message.getId();
if (currentId - startId > 1) {
for (long i = startId + 1; i < currentId; i++) {
missingReleaseMessages.putIfAbsent(i, 1);
}
}
startId = currentId;
}
}

/**
* find largest message id as the current start point
* @return current largest message id
Expand All @@ -125,7 +175,7 @@ private long loadLargestMessageId() {
* Notify listeners with messages loaded
* @param messages
*/
private void fireMessageScanned(List<ReleaseMessage> messages) {
private void fireMessageScanned(Iterable<ReleaseMessage> messages) {
for (ReleaseMessage message : messages) {
for (ReleaseMessageListener listener : listeners) {
try {
Expand Down
Expand Up @@ -18,20 +18,25 @@

import com.ctrip.framework.apollo.biz.config.BizConfig;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.SettableFuture;

import com.ctrip.framework.apollo.biz.AbstractUnitTest;
import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
import com.ctrip.framework.apollo.biz.repository.ReleaseMessageRepository;

import java.util.ArrayList;
import org.awaitility.Awaitility;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.springframework.test.util.ReflectionTestUtils;

import java.util.concurrent.TimeUnit;

import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.mockito.Mockito.when;

/**
Expand All @@ -54,6 +59,10 @@ public void setUp() throws Exception {
databaseScanInterval = 100; //100 ms
when(bizConfig.releaseMessageScanIntervalInMilli()).thenReturn(databaseScanInterval);
releaseMessageScanner.afterPropertiesSet();

Awaitility.reset();
Awaitility.setDefaultTimeout(databaseScanInterval * 5, TimeUnit.MILLISECONDS);
Awaitility.setDefaultPollInterval(databaseScanInterval, TimeUnit.MILLISECONDS);
}

@Test
Expand Down Expand Up @@ -91,7 +100,86 @@ public void testScanMessageAndNotifyMessageListener() throws Exception {

assertEquals(anotherMessage, anotherListenerMessage.getMessage());
assertEquals(anotherId, anotherListenerMessage.getId());
}

@Test
public void testScanMessageWithGapAndNotifyMessageListener() throws Exception {
String someMessage = "someMessage";
long someId = 1;
ReleaseMessage someReleaseMessage = assembleReleaseMessage(someId, someMessage);

String someMissingMessage = "someMissingMessage";
long someMissingId = 2;
ReleaseMessage someMissingReleaseMessage = assembleReleaseMessage(someMissingId, someMissingMessage);

String anotherMessage = "anotherMessage";
long anotherId = 3;
ReleaseMessage anotherReleaseMessage = assembleReleaseMessage(anotherId, anotherMessage);

String anotherMissingMessage = "anotherMissingMessage";
long anotherMissingId = 4;
ReleaseMessage anotherMissingReleaseMessage = assembleReleaseMessage(anotherMissingId, anotherMissingMessage);

long someRolledBackId = 5;

String yetAnotherMessage = "yetAnotherMessage";
long yetAnotherId = 6;
ReleaseMessage yetAnotherReleaseMessage = assembleReleaseMessage(yetAnotherId, yetAnotherMessage);

ArrayList<ReleaseMessage> receivedMessage = Lists.newArrayList();
SettableFuture<ReleaseMessage> someListenerFuture = SettableFuture.create();
ReleaseMessageListener someListener = (message, channel) -> receivedMessage.add(message);
releaseMessageScanner.addMessageListener(someListener);

when(releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(0L)).thenReturn(
Lists.newArrayList(someReleaseMessage));

await().untilAsserted(() -> {
assertEquals(1, receivedMessage.size());
assertSame(someReleaseMessage, receivedMessage.get(0));
});

when(releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(someId)).thenReturn(
Lists.newArrayList(anotherReleaseMessage));

await().untilAsserted(() -> {
assertEquals(2, receivedMessage.size());
assertSame(someReleaseMessage, receivedMessage.get(0));
assertSame(anotherReleaseMessage, receivedMessage.get(1));
});

when(releaseMessageRepository.findAllById(Sets.newHashSet(someMissingId)))
.thenReturn(Lists.newArrayList(someMissingReleaseMessage));

await().untilAsserted(() -> {
assertEquals(3, receivedMessage.size());
assertSame(someReleaseMessage, receivedMessage.get(0));
assertSame(anotherReleaseMessage, receivedMessage.get(1));
assertSame(someMissingReleaseMessage, receivedMessage.get(2));
});

when(releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(anotherId)).thenReturn(
Lists.newArrayList(yetAnotherReleaseMessage));

await().untilAsserted(() -> {
assertEquals(4, receivedMessage.size());
assertSame(someReleaseMessage, receivedMessage.get(0));
assertSame(anotherReleaseMessage, receivedMessage.get(1));
assertSame(someMissingReleaseMessage, receivedMessage.get(2));
assertSame(yetAnotherReleaseMessage, receivedMessage.get(3));
});

when(releaseMessageRepository.findAllById(Sets.newHashSet(anotherMissingId, someRolledBackId)))
.thenReturn(Lists.newArrayList(anotherMissingReleaseMessage));

await().untilAsserted(() -> {
assertEquals(5, receivedMessage.size());
assertSame(someReleaseMessage, receivedMessage.get(0));
assertSame(anotherReleaseMessage, receivedMessage.get(1));
assertSame(someMissingReleaseMessage, receivedMessage.get(2));
assertSame(yetAnotherReleaseMessage, receivedMessage.get(3));
assertSame(anotherMissingReleaseMessage, receivedMessage.get(4));
});
}

private ReleaseMessage assembleReleaseMessage(long id, String message) {
Expand Down

0 comments on commit dae4af2

Please sign in to comment.