Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

After setMaxReconsumeTimes is set, the order consumer will retry indefinitely #6163

Closed
originalHeaed opened this issue Feb 22, 2023 · 2 comments · Fixed by #6164
Closed

After setMaxReconsumeTimes is set, the order consumer will retry indefinitely #6163

originalHeaed opened this issue Feb 22, 2023 · 2 comments · Fixed by #6164
Labels

Comments

@originalHeaed
Copy link

originalHeaed commented Feb 22, 2023

你好!
版本:RocketMQ 5.0.0
情况概述:消费端只有一个消费者,设置 MaxReconsumeTimes 值为 1,调用DefaultMQPushConsumer 的顺序消费端口,在消费重试次数达到 1 后,消息被消费端投入延迟队列,随后 broker 重新将消息投入了重试队列,并没有投入死信队列;
个人源码走查:
消息在重试超过指定次数后,由消费端投递至延迟队列时,消息的重试次数 == 最大重新重试次数;
消息在延迟时间到达后,重新投递时调用 SendMessageProcessor.sendMessage 方法,内部调用了 SendMessageProcessor.handleRetryAndDLQ 方法进行判断消息是否投递死信队列(且没有将消息的重试次数进行增加)。
下面为 SendMessageProcessor.handleRetryAndDLQ 源码:

private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response,
        RemotingCommand request,
        MessageExt msg, TopicConfig topicConfig, Map<String, String> properties) {

        String newTopic = requestHeader.getTopic();
        if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
            String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
            SubscriptionGroupConfig subscriptionGroupConfig =
                this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
            if (null == subscriptionGroupConfig) {
                response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
                response.setRemark(
                    "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
                return false;
            }
            int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
            if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal() && requestHeader.getMaxReconsumeTimes() != null) {
                maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
            }
            int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
            // Using ">" instead of ">=" to compatible with the case that reconsumeTimes here are increased by client.
            if (reconsumeTimes > maxReconsumeTimes) {
                properties.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "-1");
                newTopic = MixAll.getDLQTopic(groupName);
                int queueIdInt = randomQueueId(DLQ_NUMS_PER_GROUP);
                topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
                    DLQ_NUMS_PER_GROUP,
                    PermName.PERM_WRITE | PermName.PERM_READ, 0
                );
                msg.setTopic(newTopic);
                msg.setQueueId(queueIdInt);
                msg.setDelayTimeLevel(0);
                if (null == topicConfig) {
                    response.setCode(ResponseCode.SYSTEM_ERROR);
                    response.setRemark("topic[" + newTopic + "] not exist");
                    return false;
                }
            }
        }
        int sysFlag = requestHeader.getSysFlag();
        if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
            sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
        }
        msg.setSysFlag(sysFlag);
        return true;
    }

上述方法中进入死信队列的条件是 reconsumeTimes > maxReconsumeTimes,由延迟队列投递的顺序消息没满足条件,最终导致消息投入了重试队列,随后消费端拉取该消息,重复上面的流程(消息的 reconsumeTimes 并没有增加)。
我想问下这里是需要消费端在消费时自行增加 reconsumeTimes 吗?且只有消息从重试队列中那消息后才需要自行增加,还是bug 呢

@ShadowySpirits
Copy link
Member

Please ask questions in Github Discussion, see Q&A Guidelines.

@RongtongJin RongtongJin changed the title 在设置 setMaxReconsumeTimes 后,消费端顺序消费,无限重试 After setMaxReconsumeTimes is set, the order consumer will retry indefinitely Feb 23, 2023
@RongtongJin
Copy link
Contributor

RongtongJin commented Feb 23, 2023

Hi @originalHeaed, it is indeed a bug and will be fixed ASAP.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants