Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

消费端怎么设置手动ack #137

Open
lichchw opened this issue Jul 1, 2021 · 17 comments
Open

消费端怎么设置手动ack #137

lichchw opened this issue Jul 1, 2021 · 17 comments

Comments

@lichchw
Copy link

lichchw commented Jul 1, 2021

No description provided.

@keliwang
Copy link
Contributor

keliwang commented Jul 1, 2021

void autoAck(boolean auto);

通过设置这个 autoAck 来调整。

@lichchw
Copy link
Author

lichchw commented Jul 1, 2021

我在消费实时消息的时候,由于我的业务比较耗时,当服务器宕机的时候,那些没有被消费的消息为什么丢失了,服务器重启后也没有消费

@keliwang
Copy link
Contributor

keliwang commented Jul 1, 2021

这里的服务器是你的消费者还是 qmq 的 broker?
消费者重启,没有 ack 的消息应该都会重发,除非消息已经过期了。
无论你的业务多么耗时,都是需要保证你的消费能力是要超过消息发送量的,否则消息肯定会不断堆积。同时消费耗时也不能超过设定的消息过期时间。

@lichchw
Copy link
Author

lichchw commented Jul 1, 2021

实时消息的过期时间默认是多久?哪里可以设置吗?

@keliwang
Copy link
Contributor

keliwang commented Jul 1, 2021

参考这里:https://github.com/qunarcorp/qmq/blob/master/docs/cn/install.md#%E9%85%8D%E7%BD%AE%E6%96%87%E4%BB%B6-1

# 可选,动态生效,messagelog过期时间,单位是小时
messagelog.retention.hours=72
# 可选,动态生效,consumerlog过期时间
consumerlog.retention.hours=72
# 可选,动态生效,pulllog过期时间
pulllog.retention.hours=72

@lichchw
Copy link
Author

lichchw commented Jul 1, 2021 via email

@keliwang
Copy link
Contributor

keliwang commented Jul 1, 2021

qmq 服务端没有 ack 的消息都会重复投递,消费的幂等性是需要消费者自己处理的。

@lichchw
Copy link
Author

lichchw commented Jul 1, 2021

好的,谢谢

@lichchw
Copy link
Author

lichchw commented Jul 3, 2021

你好,未ack的消息重新消费时会报这个错误
java.nio.BufferUnderflowException: null
at java.nio.Buffer.nextGetIndex(Buffer.java:506)
at java.nio.DirectByteBuffer.getLong(DirectByteBuffer.java:771)
at qunar.tc.qmq.store.PullLog.getMessageSequence(PullLog.java:104)
at qunar.tc.qmq.store.DefaultStorage.getMessageSequenceByPullLog(DefaultStorage.java:343)
at qunar.tc.qmq.store.MessageStoreWrapper.getConsumerLogSequence(MessageStoreWrapper.java:358)
at qunar.tc.qmq.store.MessageStoreWrapper.doFindUnAckMessages(MessageStoreWrapper.java:286)
at qunar.tc.qmq.store.MessageStoreWrapper.findUnAckMessages(MessageStoreWrapper.java:254)
at qunar.tc.qmq.store.MessageStoreWrapper.findMessages(MessageStoreWrapper.java:77)
at qunar.tc.qmq.processor.PullMessageWorker.process(PullMessageWorker.java:68)
at qunar.tc.qmq.processor.PullMessageWorker.process(PullMessageWorker.java:33)
at qunar.tc.qmq.concurrent.ActorSystem$Actor.processMessages(ActorSystem.java:173)
at qunar.tc.qmq.concurrent.ActorSystem$Actor.run(ActorSystem.java:155)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at qunar.tc.qmq.concurrent.NamedThreadFactory$1.run(NamedThreadFactory.java:52)
at java.lang.Thread.run(Thread.java:748)

@keliwang
Copy link
Contributor

keliwang commented Jul 3, 2021

这个错误倒是没有见过,你这个未 ack 的消息是多久之前的?

@ToTouchMyheart
Copy link

简单来描述一下问题的过程:
1 往qmq中生产数字为0-99的数据
2 consumer端可以正常消费
3.在消费的过程中kill掉consumer端
4 重启consumer端后 消费的数据均为第一次生产的数据 也就是0

@keliwang
Copy link
Contributor

@ToTouchMyheart 你用的是哪个版本,是 master 分支还是某个 release 版本?

@lichchw
Copy link
Author

lichchw commented Jul 23, 2021

您好,那个问题解决了,是我用的版本不对。
另外再请教一下,多个consumerGroup拉取消息的策略是什么?
我有两台消费者,在消费过程中发现拉取到的消息数量差别很大。

@keliwang
Copy link
Contributor

一个 consumer group 内的多个消费者共同消费主题的消息,消费者性能相同就是基本平分,消费者性能有差异则是消费能力强的消费更多。
多个 consumer group 则是互相隔离的,每个 consumer group 都是消费主题所有的消息。

@lichchw
Copy link
Author

lichchw commented Jul 23, 2021

好的,谢谢啦

@jonyangx
Copy link

jonyangx commented Nov 1, 2022

When use message.ack method .got exception:
java.lang.UnsupportedOperationException: BaseMessage does not support this method
at qunar.tc.qmq.base.BaseMessage.ack(BaseMessage.java:335) ~[qmq-1.1.41.jar:?]

@keliwang
Copy link
Contributor

keliwang commented Nov 2, 2022

When use message.ack method .got exception: java.lang.UnsupportedOperationException: BaseMessage does not support this method at qunar.tc.qmq.base.BaseMessage.ack(BaseMessage.java:335) ~[qmq-1.1.41.jar:?]

ack method only works in consumer side. e.g.

@QmqConsumer(subject = "your subject", consumerGroup = "group", executor = "your executor bean name")
public void onMessage(Message message) {
  message.ack(xxx);
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants