-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support consume back #56
Conversation
64b52d6
to
4046d5d
Compare
Codecov Report
@@ Coverage Diff @@
## master #56 +/- ##
==========================================
+ Coverage 90.59% 91.52% +0.93%
==========================================
Files 35 35
Lines 1723 1806 +83
==========================================
+ Hits 1561 1653 +92
+ Misses 162 153 -9
Continue to review full report at Codecov.
|
dc0796c
to
11039f6
Compare
6a2e213
to
13536e3
Compare
这个我晚点仔细看下 |
@@ -78,6 +78,7 @@ class MQProducer extends ClientConfig { | |||
*/ | |||
async init() { | |||
// this._topicPublishInfoTable.set(this.createTopicKey, new TopicPublishInfo()); | |||
await MQProducer.getDefaultProducer(this.options); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里为啥要 getDefaultProducer ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
初始化一下,CLIENT_INNER_PRODUCER 的 Producer 是默认就要有的
lib/consumer/mq_push_consumer.js
Outdated
})(); | ||
} | ||
|
||
async _consumeMessageLoop(topic, needFilter, tagsSet, handler, subExpression) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这段现在好复杂
consume back 和并发消费,其实可以分开 pr 的,改动都比较大 |
lib/consumer/mq_push_consumer.js
Outdated
if (!msg.tags || !needFilter || tagsSet.includes(msg.tags)) { | ||
try { | ||
if (msg.reconsumeTimes < this.options.maxReconsumeTimes) { | ||
await handler(msg, mq, pq); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里还是一条一条的串行的吧?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
改了,你再看看
37515ec
to
ad31907
Compare
lib/consumer/mq_push_consumer.js
Outdated
this.subscriptions.set(retryTopic, { | ||
subscriptionData: retrySubscriptionData, | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个可以在最开始就初始化好,不用每次都做
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
最开始初始化,没有注册过 handler 的情况下会导致找不到 handler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这一段好像和 handler 无关的吧
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
应该是在初始化的时候就监听统一的 retryTopic,retryTopic 的 handler 比较特殊,就是从 this.subscriptions 里面找原始 topic 的 handler 吐给他
lib/consumer/mq_push_consumer.js
Outdated
this.logger.error('[MQPushConsumer] send reconsume message failed, fall to local retry, msgId: %s', msg.msgId); | ||
// 重试消息发送失败,本地重试 | ||
msg.reconsumeTimes++; | ||
await this._sleep(5000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里如果真的失败的,会导致这批消息重新消费
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里还不如一只让他定时重发,直到成功。
因为直接跑错的效果也是类似的,还会导致其他消息重复消费
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里好像还没改
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
把上面的循环判断去掉了,永远重试
lib/consumer/mq_push_consumer.js
Outdated
|
||
if (this.messageModel === MessageModel.CLUSTERING) { | ||
const retryTopic = MixAll.getRetryTopic(this.consumerGroup); | ||
loops.push((async () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这样会重复注册 retryTopic 的消费逻辑吧?
9d98427
to
a65d043
Compare
lib/consumer/mq_push_consumer.js
Outdated
|
||
async consumeSingleMsg(handler, msg, mq, pq) { | ||
try { | ||
if (msg.reconsumeTimes < this.options.maxReconsumeTimes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
如果超过 maxResconsumeTimes 应该有个报错吧
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个判断直接去掉了
lib/consumer/mq_push_consumer.js
Outdated
// 并发消费任务 | ||
const consumeTasks = []; | ||
for (const msg of msgs) { | ||
utils.resetRetryTopic(msg, this.consumerGroup); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个应该放到 retryTopic 的订阅逻辑里吧?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
恩,改了,忘记删了
: msg.storeHost; | ||
const thatConsumerGroup = consumerGroup ? consumerGroup : this.consumerGroup; | ||
try { | ||
await this._mqClient.consumerSendMessageBack( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consumerSendMessageBack 和 send retry message 有什么区别?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consumerSendMessageBack 不需要发消息体,send retry message 需要发
c8b0c7f
to
a12360d
Compare
lib/consumer/mq_push_consumer.js
Outdated
this.emit('error', err); | ||
this.logger.error('[MQPushConsumer] send reconsume message failed, fallback to local retry, msgId: %s', msg.msgId); | ||
// 重试消息发送失败,本地重试 | ||
msg.reconsumeTimes++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个是不是应该挪到上个 catch 里?
其他 +1 |
a12360d
to
beb756c
Compare
3.3.0 |
#34 support RECONSUME_LATER status, send back to broker.