Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] fix cpp client do AcknowledgeCumulative not clean up previous message #8606

Merged
merged 1 commit into from
Dec 30, 2020

Conversation

saosir
Copy link
Contributor

@saosir saosir commented Nov 18, 2020

pulsar-client-cpp Consumer UnAckedMessageTrackerEnabled::removeMessagesTill should erase message whose id <= msgId in messageIdPartitionMap

Motivation

pulsar-client-cpp Consumer do AcknowledgeCumulative just clean up msgId, not <= msgId in UnAckedMessageTrackerEnabled::removeMessagesTill

Modifications

  • When do AcknowledgeCumulative from application, earse <= msgId in UnAckedMessageTrackerEnabled, avoid redeliver unnecessary unacknowledged messages to Broker
  • add unit test for UnAckedMessageTrackerEnabled

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)
  • The rest endpoints: (no)
  • The admin cli options: (no)
  • Anything that affects deployment: (no)

Documentation

  • Does this pull request introduce a new feature? (no)

@BewareMyPower
Copy link
Contributor

BewareMyPower commented Nov 21, 2020

Could you explain why the last message id should not be removed? See

void ConsumerImpl::doAcknowledgeCumulative(const MessageId& messageId, ResultCallback callback) {
this->unAckedMessageTrackerPtr_->removeMessagesTill(messageId);
this->batchAcknowledgementTracker_.deleteAckedMessage(messageId, proto::CommandAck::Cumulative);
this->ackGroupingTrackerPtr_->addAcknowledgeCumulative(messageId);

After messages whose id is <= messageId are removed, the messageId would be added to ackGroupingTrackerPtr_ to mark it as an acknowledged message though it's a pending state.

And see Java client's implementation:

} else if (ackType == AckType.Cumulative) {
onAcknowledgeCumulative(messageId, null);
stats.incrementNumAcksSent(unAckedMessageTracker.removeMessagesTill(msgId));

public int removeMessagesTill(MessageId msgId) {
writeLock.lock();
try {
int removed = 0;
Iterator<MessageId> iterator = messageIdPartitionMap.keySet().iterator();
while (iterator.hasNext()) {
MessageId messageId = iterator.next();
if (messageId.compareTo(msgId) <= 0) {
ConcurrentOpenHashSet<MessageId> exist = messageIdPartitionMap.get(messageId);
if (exist != null) {
exist.remove(messageId);
}
iterator.remove();
removed ++;
}

The behaviors of Java and C++ client are the same.

@saosir
Copy link
Contributor Author

saosir commented Nov 23, 2020

@BewareMyPower Sorry,I did not describe clearly. UnAckedMessageTrackerEnabled::removeMessagesTill should erase message whose id <= msgId in messageIdPartitionMap

see https://github.com/apache/pulsar/pull/8606/files, the origin implementation of cpp is different from that of Java client. cpp client just earse msgId (function param msgId), not <= msgId

@saosir saosir changed the title fix cpp client AcknowledgeCumulative [C++] fix cpp client AcknowledgeCumulative Nov 23, 2020
@merlimat
Copy link
Contributor

@saosir Can you add a unit test that reproduces the issue? That will also help people to understand the problem.

@BewareMyPower
Copy link
Contributor

@saosir OK, I see.

@saosir saosir changed the title [C++] fix cpp client AcknowledgeCumulative [C++] fix cpp client do AcknowledgeCumulative not clean up previous message in UnAckedMessageTrackerEnabled Nov 26, 2020
@saosir saosir changed the title [C++] fix cpp client do AcknowledgeCumulative not clean up previous message in UnAckedMessageTrackerEnabled [C++] fix cpp client do AcknowledgeCumulative not clean up previous message Nov 26, 2020
@saosir saosir force-pushed the patch-2 branch 4 times, most recently from a8336f7 to e98394a Compare November 27, 2020 05:00
@sijie
Copy link
Member

sijie commented Nov 30, 2020

@BewareMyPower Can you review this pull request again?

@BewareMyPower
Copy link
Contributor

@BewareMyPower Can you review this pull request again?

OK

@saosir
Copy link
Contributor Author

saosir commented Nov 30, 2020

@BewareMyPower I have two questions here, can you help me answer them?

  1. Both PartitionedConsumerImpl and ConsumerImpl have member variable unAckedMessageTrackerPtr_ (class UnAckedMessageTrackerEnabled), and PartitionedConsumerImpl is composed of ConsumerImpl. if the acknowledgement times out, will they send redeliverMessages repeatedly?
  2. in UnAckedMessageTrackerEnabled, param msg of UnAckedMessageTrackerEnabled::add contain partition, ledgerId, entryId,batchIndex. When ack timeout and do redeliverUnacknowledgedMessages, does it do deduplicte based on (ledgerId, entryId)? If not it will send repeat ack(ledgerId, entryId)

@BewareMyPower
Copy link
Contributor

@saosir

From my perspective, both you concerned are right. But the Java client has the same behavior, like MultiTopicsConsumerImpl and ConsumerImpl both hold an UnAckedMessageTracker instance. Except that Java client supports chucked messages but C++ client doesn't.

@saosir
Copy link
Contributor Author

saosir commented Nov 30, 2020

@BewareMyPower
Does UnAckedMessageTrackerEnabled have same problem like #8519 ?

@BewareMyPower
Copy link
Contributor

Does UnAckedMessageTrackerEnabled have same problem like #8519 ?

Yeah, consumerReference_ may be invalid when the timer callback is called.

@saosir
Copy link
Contributor Author

saosir commented Nov 30, 2020

@BewareMyPower
I will submit a fix and test case for this issue later, and help review it

@BewareMyPower
Copy link
Contributor

@saosir It's pleasure to have your contribution.

By the way, I just looked at the UnAckedMessageTrackerEnabled again. There's something to correct.

When you created a consumer with unacked messages tracker enabled like:

    Consumer consumer;
    ConsumerConfiguration consumerConf;
    consumerConf.setUnAckedMessagesTimeoutMs(11000);  // must >= 10000
    consumerConf.setTickDurationInMs(11000);
    Result result = client.subscribe("my-topic", "consumer-1", consumerConf, consumer);

If my-topic has N partitions, N+1 UnAckedMessageTrackerEnabled will be created. However, only the tracker of PartitionedConsumerImpl will add message id. Because when the internal consumers were created, the listener was set with the partitioned consumer's method, see

const auto shared_this = const_cast<PartitionedConsumerImpl*>(this)->shared_from_this();
config.setMessageListener(std::bind(&PartitionedConsumerImpl::messageReceived, shared_this, _1, _2));

And ConsumerImpl::messageReceived wouldn't be called, which means ConsumerImpl#notifyPendingReceivedCallback and ConsumerImpl#internalListener wouldn't be called.

Therefore, the trackers of sub-consumers only do the redelivery but not add any message id, see

for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) {
(*i)->redeliverUnacknowledgedMessages(messageIds);
}

redeliverUnacknowledgedMessages(messageIds) for shared and key shared subscriptions, and redeliverUnacknowledgedMessages() for other subscriptions.

@saosir
Copy link
Contributor Author

saosir commented Nov 30, 2020

@BewareMyPower PartitionedConsumerImpl internal consumer ConsumerImpl::messageReceived would be called first in ClientConnection, see

// Unlock the mutex before notifying the consumer of the
// new received message
lock.unlock();
consumer->messageReceived(shared_from_this(), msg, isChecksumValid, msgMetadata, payload);

and ConsumerImpl#internalListener will be called and notify PartitionedConsumerImpl receive messge by listener of ConsumerConfiguration(here is PartitionedConsumerImpl::messageReceived)

@saosir saosir force-pushed the patch-2 branch 4 times, most recently from f846daa to ed3704c Compare November 30, 2020 14:56
@saosir
Copy link
Contributor Author

saosir commented Dec 1, 2020

I reset branch at first commit

@BewareMyPower
Copy link
Contributor

Thanks. Please update the associated PR description too.

Also I think the tests could still be added without the refactor, like what AckGroupingTrackerEnabled tests did.

@saosir saosir force-pushed the patch-2 branch 4 times, most recently from 8ed9502 to d3f76e5 Compare December 1, 2020 11:10
@saosir
Copy link
Contributor Author

saosir commented Dec 1, 2020

Thanks. Please update the associated PR description too.

Also I think the tests could still be added without the refactor, like what AckGroupingTrackerEnabled tests did.

All is done

@BewareMyPower
Copy link
Contributor

/pulsarbot run-failure-checks

long size() { return UnAckedMessageTrackerEnabled::size(); }
}; // class UnAckedMessageTrackerEnabledMock

TEST(BasicEndToEndTest, testtUnAckedMessageTrackerDefaultBehavior) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A typo: testtUnAcked -> testUnAcked

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I need to resubmit the code to fix this problem?How about someone fix it when merge pull request?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a committer could fix it when merge PR. So you needn't any changes. @jiazhai could you help take a look?

Copy link
Member

@sijie sijie Dec 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@saosir Can you create a follow-up PR to fix the typo?

Copy link
Contributor

@BewareMyPower BewareMyPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just leave a typo comment.

@BewareMyPower
Copy link
Contributor

/pulsarbot run-failure-checks

2 similar comments
@BewareMyPower
Copy link
Contributor

/pulsarbot run-failure-checks

@BewareMyPower
Copy link
Contributor

/pulsarbot run-failure-checks

@BewareMyPower
Copy link
Contributor

/pulsarbot run-failure-checks

@saosir
Copy link
Contributor Author

saosir commented Dec 25, 2020

@BewareMyPower CI - Integration - Cli / cli (pull_request) fail, see #8790. Do I need to rebase to master branch?

@BewareMyPower
Copy link
Contributor

@saosir Yeah, your branch is far behind the master, it's better to rebase to latest master.

@saosir
Copy link
Contributor Author

saosir commented Dec 26, 2020

/pulsarbot run-failure-checks

@BewareMyPower
Copy link
Contributor

LGTM. @jiazhai Could you help take a look?

long size() { return UnAckedMessageTrackerEnabled::size(); }
}; // class UnAckedMessageTrackerEnabledMock

TEST(BasicEndToEndTest, testtUnAckedMessageTrackerDefaultBehavior) {
Copy link
Member

@sijie sijie Dec 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@saosir Can you create a follow-up PR to fix the typo?

@sijie sijie merged commit e75de48 into apache:master Dec 30, 2020
@saosir saosir deleted the patch-2 branch January 5, 2021 13:53
codelipenghui pushed a commit that referenced this pull request Jan 7, 2021
…essage (#8606)

### Motivation
pulsar-client-cpp Consumer do AcknowledgeCumulative just clean up `msgId`, not <= `msgId` in  `UnAckedMessageTrackerEnabled::removeMessagesTill`

### Modifications

- When do AcknowledgeCumulative from application, earse <= `msgId` in UnAckedMessageTrackerEnabled, avoid redeliver unnecessary unacknowledged messages to Broker
- add unit test for `UnAckedMessageTrackerEnabled`

(cherry picked from commit e75de48)
@codelipenghui codelipenghui added the cherry-picked/branch-2.7 Archived: 2.7 is end of life label Jan 7, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants