Skip to content

Commit

Permalink
[ROCKETMQ-102] When shutdown(), the persisted offet is not the latest…
Browse files Browse the repository at this point in the history
… consumed message, which may cause repeated messages.

Add configuration to push consumer to accept await termination time to await consuming.

fixup
  • Loading branch information
Jaskey committed Feb 20, 2017
1 parent 573b22c commit d714c6c
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 7 deletions.
Expand Up @@ -214,6 +214,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*/
private long consumeTimeout = 15;

/**
* Maximum time to await message consuming when shutdown consumer, 0 indicates no await.
*/
private long awaitTerminationMillisWhenShutdown = 0;

/**
* Default constructor.
*/
Expand Down Expand Up @@ -461,7 +466,7 @@ public void start() throws MQClientException {
*/
@Override
public void shutdown() {
this.defaultMQPushConsumerImpl.shutdown();
this.defaultMQPushConsumerImpl.shutdown(awaitTerminationMillisWhenShutdown);
}

@Override
Expand Down Expand Up @@ -616,4 +621,12 @@ public long getConsumeTimeout() {
public void setConsumeTimeout(final long consumeTimeout) {
this.consumeTimeout = consumeTimeout;
}

public long getAwaitTerminationMillisWhenShutdown() {
return awaitTerminationMillisWhenShutdown;
}

public void setAwaitTerminationMillisWhenShutdown(long awaitTerminationMillisWhenShutdown) {
this.awaitTerminationMillisWhenShutdown = awaitTerminationMillisWhenShutdown;
}
}
Expand Up @@ -92,9 +92,23 @@ public void run() {
}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
}

public void shutdown() {
@Override
public void shutdown(long awaitTerminateMillis) {
this.scheduledExecutorService.shutdown();
this.consumeExecutor.shutdown();
//await to consume
if (awaitTerminateMillis > 0) {
try {
this.consumeExecutor.awaitTermination(awaitTerminateMillis,TimeUnit.MILLISECONDS);
if (!this.consumeExecutor.isTerminated()) {
log.info("There are messages still being consumed in thread pool, but not going to await them anymore after waiting for {} ms",awaitTerminateMillis);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // set interrupt flag
log.warn("got InterruptedException when await termination");
}
}

this.cleanExpireMsgExecutors.shutdown();
}

Expand Down
Expand Up @@ -92,10 +92,22 @@ public void run() {
}
}

public void shutdown() {
@Override
public void shutdown(long awaitTerminateMillis) {
this.stopped = true;
this.scheduledExecutorService.shutdown();
this.consumeExecutor.shutdown();
//await to consume
if (awaitTerminateMillis > 0) {
try {
this.consumeExecutor.awaitTermination(awaitTerminateMillis,TimeUnit.MILLISECONDS);
if (!this.consumeExecutor.isTerminated()) log.info("There are messages still being consumed in thread pool, but not going to await them anymore. Have awaited for {} ms",awaitTerminateMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // set interrupt flag
log.warn("got InterruptedException when awaitTermination");
}
}

if (MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
this.unlockAllMQ();
}
Expand Down
Expand Up @@ -24,7 +24,7 @@
public interface ConsumeMessageService {
void start();

void shutdown();
void shutdown(long awaitTerminateMillis);

void updateCorePoolSize(int corePoolSize);

Expand Down
Expand Up @@ -515,12 +515,12 @@ private int getMaxReconsumeTimes() {
}
}

public void shutdown() {
public void shutdown(long awaitTerminateMillis) {
switch (this.serviceState) {
case CREATE_JUST:
break;
case RUNNING:
this.consumeMessageService.shutdown();
this.consumeMessageService.shutdown(awaitTerminateMillis);
this.persistConsumerOffset();
this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
this.mQClientFactory.shutdown();
Expand Down Expand Up @@ -593,7 +593,7 @@ public void start() throws MQClientException {
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
Expand Down
Expand Up @@ -19,10 +19,13 @@
import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
Expand All @@ -31,6 +34,7 @@
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.FindBrokerResult;
Expand All @@ -52,6 +56,7 @@
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -135,6 +140,7 @@ public void init() throws Exception {
messageClientExt.setOffsetMsgId("234");
messageClientExt.setBornHost(new InetSocketAddress(8080));
messageClientExt.setStoreHost(new InetSocketAddress(8080));
messageClientExt.setQueueOffset(((PullMessageRequestHeader)mock.getArgument(1)).getQueueOffset());
PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
((PullCallback)mock.getArgument(4)).onSuccess(pullResult);
return pullResult;
Expand Down Expand Up @@ -174,6 +180,37 @@ public void testPullMessage_Success() throws InterruptedException, RemotingExcep
assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
}

@Test
public void testShutdownAwait() throws Exception {
final LinkedList<Long> consumedOffset = new LinkedList<>();
pushConsumer.setPullInterval(0);
pushConsumer.setPullThresholdForQueue(100);
pushConsumer.setAwaitTerminationMillisWhenShutdown(60* 1000);//await consume for at most 60 seconds. If we do not set await millis, this test case will not pass
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {e.printStackTrace();}
synchronized (consumedOffset) {
consumedOffset.add(msg.getQueueOffset());
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}));
pushConsumer.getDefaultMQPushConsumerImpl().doRebalance();
PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
pullMessageService.executePullRequestImmediately(createPullRequest());
Thread.sleep(1000);
pushConsumer.shutdown();
long persitOffset =pushConsumer.getDefaultMQPushConsumerImpl().getOffsetStore().readOffset(new MessageQueue(topic, brokerName, 0), ReadOffsetType.READ_FROM_MEMORY);
Thread.sleep(1000);//wait for thread pool to continue consume for sometime if not terminated well
Collections.sort(consumedOffset);
Assert.assertEquals("actual consumed offset is not equals to persist offset when shutdown await", consumedOffset.getLast() + 1, persitOffset);//when shutdown with await, the persisted offset should be the latest message offset
}

@Test
public void testPullMessage_SuccessWithOrderlyService() throws Exception {
final CountDownLatch countDownLatch = new CountDownLatch(1);
Expand Down

0 comments on commit d714c6c

Please sign in to comment.