Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ROCKETMQ-102] When shutdown(), the persisted offet is not the latest consumed message, which may cause repeated messages. #64

Closed
wants to merge 1 commit into from

Conversation

Jaskey
Copy link
Contributor

@Jaskey Jaskey commented Feb 16, 2017

Solution: add interface for push consumer to accept await termination time to await consuming.

JIRA: https://issues.apache.org/jira/browse/ROCKETMQ-102

UnitTest in DefaultMQPushConsumerTest.java

@Test
public void testShutdownAwait() throws Exception {
    final LinkedList<Long> consumedOffset = new LinkedList<>();
    pushConsumer.setPullInterval(0);
    pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
        @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {e.printStackTrace();}
                synchronized (consumedOffset) {
                    consumedOffset.add(msg.getQueueOffset());
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }));
    pushConsumer.getDefaultMQPushConsumerImpl().doRebalance();
    PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
    pullMessageService.executePullRequestImmediately(createPullRequest());
    Thread.sleep(1000);
    pushConsumer.shutdown(10 * 1000);//await consume for at most 10 seconds
    //pushConsumer.shutdown();//here if we do not await, the test case will not pass
    long persitOffset =pushConsumer.getDefaultMQPushConsumerImpl().getOffsetStore().readOffset(new MessageQueue(topic, brokerName, 0), ReadOffsetType.READ_FROM_MEMORY);
    Thread.sleep(1000);//wait for thread pool to continue consume for sometime if not terminated well
    Collections.sort(consumedOffset);
    Assert.assertEquals(consumedOffset.getLast() + 1, persitOffset);//when shutdown with await, the persisted offset should be the latest message offset
}

@lizhanhui
Copy link
Contributor

Can you provide some test data, say before/after applying this patch, how many duplications are found respectively?

IMHO, we should make the API as concise as possible.

@Jaskey
Copy link
Contributor Author

Jaskey commented Feb 16, 2017

@lizhanhui

The problem exists for long, please review an old issue.
https://github.com/alibaba/RocketMQ/issues/367

The number of duplicated messages are depending on how many messages are being consumed in the thread pool or in the pending queue. This should be very easy to meet when consumptions are takes long and with massive accumulation.

The problem should be addressed since rocketmq should prevend duplication as much as possible.

The reason I add a new interface is that I want to make it compatible with the previous version, actually, if one shutdown(long awaitTerminations) is enough.

Or, if we don't mind, we could use a field to make this in push consumer, if we do this, no more interface is needed but a new field is added.

@lizhanhui
Copy link
Contributor

I know this issue has been brought up in the past. What I suggest here is providing some testing data to consolidate the rationality of this patch. By doing so, you'll find it easier to get developers convinced.

Anyway, thanks a lot for bringing this issue back to attention.

@Jaskey
Copy link
Contributor Author

Jaskey commented Feb 16, 2017

@lizhanhui please refer to the test case and just switch the shutdown(long milis) method to shutdown() to find out the problem

@Jaskey
Copy link
Contributor Author

Jaskey commented Feb 16, 2017

@lizhanhui @zhouxinyu @vintagewang

How do you think my proposol of adding a field called awaitMilisWhenShutdown in DefaultPushConsumer, which will not need to add a shutdown(long awaitMilis) interface but one more config field.

@coveralls
Copy link

coveralls commented Feb 16, 2017

Coverage Status

Coverage decreased (-0.3%) to 31.215% when pulling db9e92a on Jaskey:ROCKETMQ-102-shutdown-await into 573b22c on apache:master.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.3%) to 31.215% when pulling db9e92a on Jaskey:ROCKETMQ-102-shutdown-await into 573b22c on apache:master.

1 similar comment
@coveralls
Copy link

Coverage Status

Coverage decreased (-0.3%) to 31.215% when pulling db9e92a on Jaskey:ROCKETMQ-102-shutdown-await into 573b22c on apache:master.

if (awaitTerminateMillis > 0) {
try {
this.consumeExecutor.awaitTermination(awaitTerminateMillis,TimeUnit.MILLISECONDS);
if (!this.consumeExecutor.isTerminated()) log.info("There are messages still being consumed in thread pool, but not going to await them anymore. Have awaited for {} ms",awaitTerminateMillis);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you follow our code code guidelines[1] ? Below if block is recommended.

if {
}

http://rocketmq.incubator.apache.org/docs/code-guidelines/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will update the pr accordingly.

shutdown(0);
}

public void shutdown(long awaitTerminateMillis) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since MQPushConsumer only has the method interface for shutdown( ), so we could consider pull shutdown(long awaitTerminateMillis) up to the parent interface or make shutdown(long awaitTerminateMillis) private and give a default input(non-zero) in shutdown().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhouxinyu

I know and agree to your concern, actually, as I my previous comment, maybe we could add an configuration for awaitMills, what do you think ? The default value could be 0 to keep the same behavior

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, it's ok for me.

@zhouxinyu
Copy link
Member

Hi @Jaskey ,
Thanks for this PR, I added two comments for your reference.

@coveralls
Copy link

coveralls commented Feb 17, 2017

Coverage Status

Coverage increased (+0.5%) to 31.992% when pulling b178d2f on Jaskey:ROCKETMQ-102-shutdown-await into 573b22c on apache:master.

1 similar comment
@coveralls
Copy link

Coverage Status

Coverage increased (+0.5%) to 31.992% when pulling b178d2f on Jaskey:ROCKETMQ-102-shutdown-await into 573b22c on apache:master.

@Jaskey
Copy link
Contributor Author

Jaskey commented Feb 20, 2017

@lizhanhui @zhouxinyu

please review the updated pr, which remains the same interface of push consumer.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.2%) to 31.691% when pulling d714c6c on Jaskey:ROCKETMQ-102-shutdown-await into 573b22c on apache:master.

1 similar comment
@coveralls
Copy link

Coverage Status

Coverage increased (+0.2%) to 31.691% when pulling d714c6c on Jaskey:ROCKETMQ-102-shutdown-await into 573b22c on apache:master.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.4%) to 31.919% when pulling 5667cdf on Jaskey:ROCKETMQ-102-shutdown-await into 573b22c on apache:master.

2 similar comments
@coveralls
Copy link

Coverage Status

Coverage increased (+0.4%) to 31.919% when pulling 5667cdf on Jaskey:ROCKETMQ-102-shutdown-await into 573b22c on apache:master.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.4%) to 31.919% when pulling 5667cdf on Jaskey:ROCKETMQ-102-shutdown-await into 573b22c on apache:master.

@@ -52,6 +56,7 @@
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After;
import org.junit.Assert;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, let's unify the assert tool and use org.assertj.core.api.Assertions.assertThat.

if (awaitTerminateMillis > 0) {
try {
this.consumeExecutor.awaitTermination(awaitTerminateMillis,TimeUnit.MILLISECONDS);
if (!this.consumeExecutor.isTerminated()) log.info("There are messages still being consumed in thread pool, but not going to await them anymore. Have awaited for {} ms",awaitTerminateMillis);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be we need a common method to shutdown executor gracefully, like:

public static void shutdownGracefully(ExecutorService executor, long timeout, TimeUnit timeUnit) {
        executor.shutdown();

        try {
            if(!executor.awaitTermination(timeout, timeUnit)) {
                executor.shutdownNow();
                if(!executor.awaitTermination(timeout, timeUnit)) {
                    LOG.warn(String.format("%s didn\'t terminate!", new Object[]{executor}));
                }
            }
        } catch (InterruptedException var5) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }

    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, but where do we put this method in, in a Common Util?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a new java file ThreadUtils?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @Jaskey ,

I added a ThreadUtils, please refer to here.

Copy link
Contributor Author

@Jaskey Jaskey Mar 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then maybe you merge that commit first and I will merge it later. Also, I don't think shutdownNow is a good choice for consume services, please refer to my comment

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdown is the first choice, shutdownNow will be called if timeout.

And develop will be merged to master in next release, please refer to our new branching model.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhouxinyu In my option, rocketmq should have no right to interrupt what dev's business are doing, we may be doing some time-cost job which are doing transcation or inserting database, we should leave the task running if executor is still not terminated.

Besides, since the old version has not termination millis, so 0 of termination millis is the default behavior , shutdown now will cost task being interrupt/cancel immediately which is not proper in my opinion.

I have updated the pr , but still using my old method, please review and let's discuss more about it.

@Jaskey
Copy link
Contributor Author

Jaskey commented Mar 1, 2017

@zhouxinyu

Please check the updated pr.

Since shutdownNow needs developer to take respond to interrupts but actually they should not care about this and they will not do this in most of the cases since in the old version, no similar behavior will trigger, so I still remain using shutdown , and after timeout , the method will return and remains the very same flow.

BTW, unit test has been updated using AssetThat

@coveralls
Copy link

Coverage Status

Changes Unknown when pulling 67cbdc3 on Jaskey:ROCKETMQ-102-shutdown-await into ** on apache:master**.

2 similar comments
@coveralls
Copy link

Coverage Status

Changes Unknown when pulling 67cbdc3 on Jaskey:ROCKETMQ-102-shutdown-await into ** on apache:master**.

@coveralls
Copy link

Coverage Status

Changes Unknown when pulling 67cbdc3 on Jaskey:ROCKETMQ-102-shutdown-await into ** on apache:master**.

… consumed message, which may cause repeated messages.

Add configuration to push consumer to accept await termination time to await consuming.
@Jaskey Jaskey changed the base branch from master to develop March 7, 2017 05:40
@coveralls
Copy link

coveralls commented Mar 7, 2017

Coverage Status

Coverage increased (+0.3%) to 31.243% when pulling 3da9a79 on Jaskey:ROCKETMQ-102-shutdown-await into e3f4251 on apache:develop.

@Jaskey
Copy link
Contributor Author

Jaskey commented Apr 1, 2017

Repeated messages is a big problem for a message queue, and this is a known issue. When can this pr be reviewed and merged?
@lizhanhui @zhouxinyu @vongosling

@vongosling
Copy link
Member

@Jaskey I will close the pr, if you happened to the same question, please let me know.

@vongosling vongosling closed this Jul 14, 2018
@Jaskey
Copy link
Contributor Author

Jaskey commented Jun 10, 2020

I request a another issue #2085 since I find this feature is still not supported for the latest release @zhouxinyu @vongosling

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants