-
Notifications
You must be signed in to change notification settings - Fork 12k
Closed
Description
Before Creating the Bug Report
-
I found a bug, not just asking a question, which should be created in GitHub Discussions.
-
I have searched the GitHub Issues and GitHub Discussions of this repository and believe that this is not a duplicate.
-
I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ.
Runtime platform environment
windows 11
RocketMQ version
java 5.1.0
JDK Version
无
Describe the Bug
在Broker事务回查的check方法中 , 我看到doneOpOffset.add()这一行的前提是要求opMsgMap.get(removedOpOffset).size() == 0 , 根据提交记录是为了支持Batch OPMsg功能 , 据我理解是一条OpMsg可以关联多条Half消息 .
if (removeMap.containsKey(i)) {
log.debug("Half offset {} has been committed/rolled back", i);
Long removedOpOffset = removeMap.remove(i);
opMsgMap.get(removedOpOffset).remove(i);
if (opMsgMap.get(removedOpOffset).size() == 0) {
opMsgMap.remove(removedOpOffset);
doneOpOffset.add(removedOpOffset); //here
}
}但是我在继续翻阅代码的过程中,发现下述代码中,标记 here的这一行doneOpOffset.add(tmpOpOffset); 却没有判断这条opMsg关联的half消息是否都处理过了 , 而是直接添加到了doneOpOffset中,相当于没有支持Batch OpMsg功能 , 感觉是有问题的 .
private boolean checkPrepareQueueOffset(HashMap<Long, Long> removeMap, List<Long> doneOpOffset,
MessageExt msgExt, String checkImmunityTimeStr) {
String prepareQueueOffsetStr = msgExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET);
if (null == prepareQueueOffsetStr) {
return putImmunityMsgBackToHalfQueue(msgExt);
} else {
long prepareQueueOffset = getLong(prepareQueueOffsetStr);
if (-1 == prepareQueueOffset) {
return false;
} else {
if (removeMap.containsKey(prepareQueueOffset)) {
long tmpOpOffset = removeMap.remove(prepareQueueOffset);
doneOpOffset.add(tmpOpOffset); //here
log.info("removeMap contain prepareQueueOffset. real_topic={},uniqKey={},immunityTime={},offset={}",
msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC),
msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX),
checkImmunityTimeStr,
msgExt.getQueueOffset());
return true;
} else {
return putImmunityMsgBackToHalfQueue(msgExt);
}
}
}
}Steps to Reproduce
无
What Did You Expect to See?
无
What Did You See Instead?
无
Additional Context
无
Metadata
Metadata
Assignees
Labels
No labels