Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumeMessageCurrently maybe cause data lost if retry queue not Available #797

Closed
shiftyman opened this issue Mar 31, 2022 · 1 comment · Fixed by #860
Closed

consumeMessageCurrently maybe cause data lost if retry queue not Available #797

shiftyman opened this issue Mar 31, 2022 · 1 comment · Fixed by #860
Labels
bug Something isn't working
Milestone

Comments

@shiftyman
Copy link

shiftyman commented Mar 31, 2022

version v2.1.0

push_comsumer.go, line 1037~1040:

                              if offset >= 0 && !pq.IsDroppd() {
				pc.storage.update(mq, int64(offset), true)
			}
			if len(msgBackFailed) > 0 {
				subMsgs = msgBackFailed
				time.Sleep(5 * time.Second)
				goto RETRY
			}

note: goto retry after offset update,if application crash at this moment,msg lost!

@ShannonDing
Copy link
Member

yes, it seems something error here. remove Messages consumer succ from pq is right. like this.

for i := 0; i < len(subMsgs); i++ {
	msg := subMsgs[i]
	if !pc.sendMessageBack(mq.BrokerName, msg, concurrentCtx.DelayLevelWhenNextConsume) {
		msg.ReconsumeTimes += 1
		msgBackFailed = append(msgBackFailed, msg)
	}else {
		msgBackSucc =append(msgBackSucc,msg)
	}
}
	
offset := pq.removeMessage(msgBackSucc...)

if offset >= 0 && !pq.IsDroppd() {
	pc.storage.update(mq, int64(offset), true)
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
2 participants