Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] Support setting priority for the consumer #12076

Merged
merged 3 commits into from
Sep 20, 2021

Conversation

metahys
Copy link
Contributor

@metahys metahys commented Sep 17, 2021

Motivation

#165 added support for dispatching message based on consumer priority. However, for C++ client, there is no way to set such priority on ConsumerConfiguration. This PR is to support setting priority for the consumer in C++ client.

Modifications

  • Add getPriorityLevel and setPriorityLevel methods in ConsumerConfiguration
  • Modify the function signature of Commands::newSubscribe to add additional parameter priority_level with default value 0

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

  • Add unit test for ConsumerConfiguration::getPriorityLevel and ConsumerConfiguration::setPriorityLevel in ConsumerConfigurationTest

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)
  • The rest endpoints: (no)
  • The admin cli options: (no)
  • Anything that affects deployment: (no)

@merlimat merlimat added this to the 2.9.0 milestone Sep 17, 2021
@merlimat merlimat added component/c++ doc-not-needed Your PR changes do not impact docs labels Sep 17, 2021
pulsar-client-cpp/lib/Commands.cc Outdated Show resolved Hide resolved
@BewareMyPower
Copy link
Contributor

There's something wrong with vcpkg build on Windows currently so that the I'll push a fix soon.

@BewareMyPower
Copy link
Contributor

Please rebase to master to fix the broken CI.

@metahys
Copy link
Contributor Author

metahys commented Sep 19, 2021

Please rebase to master to fix the broken CI.

PTAL @BewareMyPower, running the workflows require approval.

@metahys
Copy link
Contributor Author

metahys commented Sep 19, 2021

It seems that KeySharedConsumerTest will produce non-deterministic results due to using rand() to generate the key. And for some seeds used by srand(), the test will fail.

@BewareMyPower
Copy link
Contributor

And for some seeds used by srand(), the test will fail.

Yeah, it's a flaky test but now it doesn't fail frequently. You can comment /pulsarbot run-failure-checks to rerun the tests.

@metahys
Copy link
Contributor Author

metahys commented Sep 19, 2021

And for some seeds used by srand(), the test will fail.

Yeah, it's a flaky test but now it doesn't fail frequently. You can comment /pulsarbot run-failure-checks to rerun the tests.

OK, got it. Thanks.

@merlimat merlimat merged commit f4f4cdd into apache:master Sep 20, 2021
BewareMyPower added a commit that referenced this pull request Jan 27, 2022
…13883)

In C++ client, there is a corner case that when a reader's start message ID is the last message of a topic, `hasMessageAvailable` returns true. However, it should return false because the start message ID is exclusive and in this case `readNext` would never return a message unless new messages arrived.

The current C++ implementation of `hasMessageAvailable` is from long days ago and has many problems. So this PR migrates the Java implementation of `hasMessageAvailable` to C++ client.

Since after the modifications we need to access `startMessageId` in `hasMessageAvailable`, which is called in a different thread from `connectionOpened` that might modify `startMessageId`. We use a common mutex `mutexForMessageIds` to protect the access to `lastDequedMessageId_` and `lastMessageIdInBroker_`.

To fix the original tests when `startMessageId` is latest, this PR adds a `GetLastMessageIdResponse` as the response of `GetLastMessageId` request. The  `GetLastMessageIdResponse` contains the `consumer_mark_delete_position` introduced from #9652 to compare with `last_message_id` when `startMessageId` is latest.

This change added tests `ReaderTest#testHasMessageAvailableWhenCreated` and `MessageIdTest# testCompareLedgerAndEntryId`.

(cherry picked from commit e50493e)

Fix the conflicts by:
- Remove ReaderImpl::getLastMessageIdAsync introduced from #11723
- Remove getPriorityLevel() method introduced from #12076
- Revert changes of registerConsumer from #12118
- Remove new fields introduced from #13627
bharanic-dev pushed a commit to bharanic-dev/pulsar that referenced this pull request Mar 18, 2022
* [C++] Support setting priority for the consumer

* Fixed formatting

* Fixed code style

Co-authored-by: sunhongyi <sunhongyi@tencent.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs release/2.9.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants