Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue-6403: Consumer received duplicated deplayed messages upon restart #6404

Merged

Conversation

liudezhi2098
Copy link
Contributor

Motivation
Fix when send a delayed message ,there is a case when a consumer restarts and pull duplicate messages. #6403

read entry

//org.apache.pulsar.client.impl.MessageImpl
Set<PositionImpl> messagesToReplayNow = getMessagesToReplayNow(messagesToRead);

            if (!messagesToReplayNow.isEmpty()) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Schedule replay of {} messages for {} consumers", name, messagesToReplayNow.size(),
                            consumerList.size());
                }

                havePendingReplayRead = true;
                Set<? extends Position> deletedMessages = asyncReplayEntries(messagesToReplayNow);
                // clear already acked positions from replay bucket

                deletedMessages.forEach(position -> messagesToRedeliver.remove(((PositionImpl) position).getLedgerId(),
                        ((PositionImpl) position).getEntryId()));
                // if all the entries are acked-entries and cleared up from messagesToRedeliver, try to read
                // next entries as readCompletedEntries-callback was never called
                if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) {
                    havePendingReplayRead = false;
                    readMoreEntries();
                }
            } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) {
                log.warn("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", name,
                        totalUnackedMessages, maxUnackedMessages);
            } else if (!havePendingRead) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Schedule read of {} messages for {} consumers", name, messagesToRead,
                            consumerList.size());
                }
                havePendingRead = true;
                cursor.asyncReadEntriesOrWait(messagesToRead, this, ReadType.Normal);
            } else {
                log.debug("[{}] Cannot schedule next read until previous one is done", name);
            }

Order of reading messages
1、getMessagesToReplayNow
2、asyncReadEntriesOrWait

send consumer

    SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
                List<Entry> entriesForThisConsumer = entries.subList(start, start + messagesForC);

                EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
                filterEntriesForConsumer(entriesForThisConsumer, batchSizes, sendMessageInfo);

                c.sendMessages(entriesForThisConsumer, batchSizes, sendMessageInfo.getTotalMessages(),
                        sendMessageInfo.getTotalBytes(), redeliveryTracker);
normal situation

1、ReadType.Normal
Because the delay time has not arrived
but filterEntriesForConsumer method will fileter
2、ReadType.Replay
delay time has arrived ,and send consumer

abnormal situation

when the delay time has not arrived, stop conumser,
then the delay time has arrived, start conumser.
1、ReadType.Replay
delay time has arrived ,and send consumer
2、ReadType.Normal
delay time has arrived ,and send consumer

Changes
update readPosition to next

       positions.stream().filter(position -> !alreadyAcknowledgedPositions.contains(position))
                .forEach(p ->{
                    if (((PositionImpl) p).compareTo(this.readPosition) == 0) {
                        this.setReadPosition(this.readPosition.getNext());
                        log.warn("[{}][{}] replayPosition{} equals readPosition{}," + " need set next readPositio",
                                ledger.getName(), name, (PositionImpl) p, this.readPosition);
                    }
                    ledger.asyncReadEntry((PositionImpl) p, cb, ctx);
                });

@sijie sijie added area/client type/bug The PR fixed a bug or issue reported a bug labels Feb 24, 2020
@sijie sijie added this to the 2.6.0 milestone Feb 24, 2020
@sijie sijie changed the title fix when send a delayed message ,there is a case when a consumer rest… Issue-6403: Consumer received duplicated deplayed messages upon restart Feb 24, 2020
@sijie
Copy link
Member

sijie commented Feb 24, 2020

/pulsarbot run-failure-checks

1 similar comment
@codelipenghui
Copy link
Contributor

/pulsarbot run-failure-checks

@codelipenghui codelipenghui merged commit e71b9fc into apache:master Feb 26, 2020
@jiazhai
Copy link
Member

jiazhai commented Mar 13, 2020

cherry-picked into branch-2.5

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/client release/2.5.1 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants