Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] Fix Consumer send redeliverMessages repeatedly #9072

Merged
merged 9 commits into from
Jan 6, 2021

Conversation

saosir
Copy link
Contributor

@saosir saosir commented Dec 28, 2020

Fixes #9028

Motivation

Both PartitionedConsumerImpl and ConsumerImpl have member variable unAckedMessageTrackerPtr_ (class UnAckedMessageTrackerEnabled), and PartitionedConsumerImpl is composed of ConsumerImpl. If the acknowledgement times out, they will send redeliverMessages repeatedly, MultiTopicsConsumerImpl has same problem.

Modifications

  • add hasParent_ field to whether there is a parent in ConsumerImpl
  • add unit test verify the redelivery request won't be sent repeatedly
  • add GTest header gtest/gtest_prod.h to access private members in unit tests
  • fix typo

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)
  • The rest endpoints: (no)
  • The admin cli options: (no)
  • Anything that affects deployment: (no)

Documentation

  • Does this pull request introduce a new feature? (no)

@codelipenghui
Copy link
Contributor

@BewareMyPower Please help review this PR.

Copy link
Contributor

@BewareMyPower BewareMyPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test like #4595 did?

@saosir
Copy link
Contributor Author

saosir commented Dec 29, 2020

@BewareMyPower Is it necessary to reset the batchIndex of MessageId when tracking a message . Because only the ack of the last message in batchMessage will be removed from UnAckedMessageTrackerEnabled

void ConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) {
ResultCallback cb = std::bind(&ConsumerImpl::statsCallback, shared_from_this(), std::placeholders::_1,
callback, proto::CommandAck_AckType_Individual);
if (msgId.batchIndex() != -1 &&
!batchAcknowledgementTracker_.isBatchReady(msgId, proto::CommandAck_AckType_Individual)) {
cb(ResultOk);
return;
}
doAcknowledgeIndividual(msgId, cb);
}

But the param msgId of add UnAckedMessageTrackerEnabled#add may contain batchIndex

bool UnAckedMessageTrackerEnabled::add(const MessageId& m) {
std::lock_guard<std::mutex> acquire(lock_);
if (messageIdPartitionMap.count(m) == 0) {
std::set<MessageId>& partition = timePartitions.back();
bool emplace = messageIdPartitionMap.emplace(m, partition).second;
bool insert = partition.insert(m).second;
return emplace && insert;
}
return false;
}

@BewareMyPower
Copy link
Contributor

@saosir Yeah, your concern is right.

@sijie
Copy link
Member

sijie commented Jan 4, 2021

@BewareMyPower Can you review this PR again?

@BewareMyPower
Copy link
Contributor

@sijie I think a unit tests is required for this change, but the recent changes don't contain a test.

@saosir could you add a test to verify the redelivery request won't be sent repeatedly?

@saosir
Copy link
Contributor Author

saosir commented Jan 5, 2021

@BewareMyPower Test cases have been added, and the test cases use gtest to access private member variables. I don’t know if this is appropriate.

Help review it, thanks!

@BewareMyPower
Copy link
Contributor

@saosir It's not a problem to access private members in tests. I'll take a look.

@saosir
Copy link
Contributor Author

saosir commented Jan 5, 2021

#9072 (comment)

I will submit another pr to deal with this problem later.

Copy link
Contributor

@BewareMyPower BewareMyPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code itself looks good to me. However, introducing FRIEND_TEST to code under lib/ is not a good way.

Currently, users could define -DBUILD_TESTS=OFF in the CMake build phase to skip tests and build libraries only. However, after changes of this PR, the build will fail because it added an extra dependency (GTest) to lib/ subdirectory.

I think you can find an alternative way of FRIEND_TEST or use a conditional macro to hide the gtest codes under lib/. Though you can also just add the GTest dependency in CMakeLists.txt, it's not recommended because the source code without tests should not depend on GTest.

@saosir
Copy link
Contributor Author

saosir commented Jan 5, 2021

The code itself looks good to me. However, introducing FRIEND_TEST to code under lib/ is not a good way.

Currently, users could define -DBUILD_TESTS=OFF in the CMake build phase to skip tests and build libraries only. However, after changes of this PR, the build will fail because it added an extra dependency (GTest) to lib/ subdirectory.

I think you can find an alternative way of FRIEND_TEST or use a conditional macro to hide the gtest codes under lib/. Though you can also just add the GTest dependency in CMakeLists.txt, it's not recommended because the source code without tests should not depend on GTest.

How about add Gtest header gtest/gtest_prod.h to dir pulsar-client-cpp/include

@BewareMyPower
Copy link
Contributor

How about add Gtest header gtest/gtest_prod.h to dir pulsar-client-cpp/include

Yeah, it's a simple and good solution.

@saosir
Copy link
Contributor Author

saosir commented Jan 5, 2021

@BewareMyPower I have a question.

Why default router policy of PartitionedProducer is SinglePartition in pulsar-client-cpp

routingMode(ProducerConfiguration::UseSinglePartition),

but in pulsar-client-java is RoundRobinPartition

if(conf.getMessageRoutingMode() == null && conf.getCustomMessageRouter() == null) {
messageRoutingMode(MessageRoutingMode.RoundRobinPartition);
} else if(conf.getMessageRoutingMode() == null && conf.getCustomMessageRouter() != null) {

@BewareMyPower
Copy link
Contributor

@saosir I'm also not sure why it's the default routing mode. It could make pulsar beginners confused like I've experienced before. But considering the change of default behavior may affect existed applications, I'd like to keep it as default.

@saosir
Copy link
Contributor Author

saosir commented Jan 5, 2021

How about add Gtest header gtest/gtest_prod.h to dir pulsar-client-cpp/include

Yeah, it's a simple and good solution.

CI will check license header, but gtest/gtest_prod.h contain google license cause it fail.

Failed to execute goal com.mycila:license-maven-plugin:4.0.rc2:check (default) on project pulsar: Some files do not have the expected license header

@BewareMyPower
Copy link
Contributor

@saosir You can add this file to pom.xml's excludes:

      <plugin>
        <groupId>com.mycila</groupId>
        <artifactId>license-maven-plugin</artifactId>
        <version>${license-maven-plugin.version}</version>
        <configuration>
          <licenseSets>
            <licenseSet>
              <header>${pulsar.basedir}/src/license-header.txt</header>
              <excludes>
                <!-- TODO: add it here -->

@saosir
Copy link
Contributor Author

saosir commented Jan 5, 2021

why blankPartitions + 1 in UnAckedMessageTrackerEnabled constructor ? If timeoutMs_ == tickDurationInMs, it will wait 2*timeoutMs_ to send redelivery messages at first timeout.

int blankPartitions = (int)std::ceil((double)timeoutMs_ / tickDurationInMs_);
for (int i = 0; i < blankPartitions + 1; i++) {
std::set<MessageId> msgIds;
timePartitions.push_back(msgIds);
}

@BewareMyPower
Copy link
Contributor

why blankPartitions + 1 in UnAckedMessageTrackerEnabled constructor ? If timeoutMs_ == tickDurationInMs, it will wait 2*timeoutMs_ to send redelivery messages at first timeout.

int blankPartitions = (int)std::ceil((double)timeoutMs_ / tickDurationInMs_);
for (int i = 0; i < blankPartitions + 1; i++) {
std::set<MessageId> msgIds;
timePartitions.push_back(msgIds);
}

I think It's just a mistake. I guess the original author intended to do:

 int blankPartitions = (timeoutMs_ / tickDurationInMs_) + ((timeoutMs_ % tickDurationInMs_ == 0) ? 0 : 1);
 for (int i = 0; i < blankPartitions; i++) { 

@saosir
Copy link
Contributor Author

saosir commented Jan 5, 2021

Agree with you, java-client has same problem:

int blankPartitions = (int)Math.ceil((double)this.ackTimeoutMillis / this.tickDurationInMs);
for (int i = 0; i < blankPartitions + 1; i++) {
timePartitions.add(new ConcurrentOpenHashSet<>(16, 1));
}

@saosir
Copy link
Contributor Author

saosir commented Jan 6, 2021

/pulsarbot run-failure-checks

@saosir
Copy link
Contributor Author

saosir commented Jan 6, 2021

/pulsarbot run-failure-checks

@sijie sijie merged commit 9894b99 into apache:master Jan 6, 2021
@saosir saosir deleted the patch-4 branch January 6, 2021 11:01
@codelipenghui codelipenghui added the cherry-picked/branch-2.7 Archived: 2.7 is end of life label Jan 7, 2021
codelipenghui pushed a commit that referenced this pull request Jan 7, 2021
Fixes #9028

Both PartitionedConsumerImpl and ConsumerImpl have member variable unAckedMessageTrackerPtr_ (class UnAckedMessageTrackerEnabled), and PartitionedConsumerImpl is composed of ConsumerImpl. If the acknowledgement times out, they will send redeliverMessages repeatedly, MultiTopicsConsumerImpl has same problem.

- add `hasParent_` field to whether there is a parent in ConsumerImpl
- add unit test verify the redelivery request won't be sent repeatedly
- add GTest header `gtest/gtest_prod.h` to access private members in unit tests
- fix typo

(cherry picked from commit 9894b99)
zymap added a commit to zymap/pulsar that referenced this pull request Mar 5, 2021
---

**Motivation**

PR apache#9072 introduces the gtest file but does not exclude for apache-rat
check. So that causes the apache-rat check to fail.
merlimat pushed a commit that referenced this pull request Mar 5, 2021
* Fix the apache-rat check
---

**Motivation**

PR #9072 introduces the gtest file but does not exclude for apache-rat
check. So that causes the apache-rat check to fail.

* Move the test to the another file

* Address comments
zymap added a commit that referenced this pull request Mar 6, 2021
* Fix the apache-rat check
---

**Motivation**

PR #9072 introduces the gtest file but does not exclude for apache-rat
check. So that causes the apache-rat check to fail.

* Move the test to the another file

* Address comments

(cherry picked from commit 0d5f070)
mlyahmed pushed a commit to mlyahmed/pulsar that referenced this pull request Mar 7, 2021
* Fix the apache-rat check
---

**Motivation**

PR apache#9072 introduces the gtest file but does not exclude for apache-rat
check. So that causes the apache-rat check to fail.

* Move the test to the another file

* Address comments
zymap added a commit to streamnative/pulsar-archived that referenced this pull request Mar 8, 2021
* Fix the apache-rat check
---

**Motivation**

PR apache#9072 introduces the gtest file but does not exclude for apache-rat
check. So that causes the apache-rat check to fail.

* Move the test to the another file

* Address comments

(cherry picked from commit 0d5f070)
(cherry picked from commit c568834)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cherry-picked/branch-2.7 Archived: 2.7 is end of life release/2.7.1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[C++] PartitionedConsumerImpl send redeliverMessages repeatedly
4 participants