-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[pulsar-client] Add partition-change api for producer/consumer interceptors #12287
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the interceptor called the first time that we discover the number of partitions or only when the partitions change?
Can we add an assertion in the test about the number of calls to the interceptor?
int newPartitions = numPartitions + 5; | ||
admin.topics().updatePartitionedTopic(topicName.toString(), newPartitions); | ||
|
||
retryStrategically( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about using Awaiatility ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated it.
It's on partition change only. Yes, I have added the assertion by adding incrementing the counter. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
* up/master: (26 commits) [pulsar-admin] Allow setting --forward-source-message-property to false when updating a pulsar function (apache#12128) [website][upgrade]feat: docs migration - Development (apache#12320) Update delete inactive topic configuration documentation (apache#12350) [PIP 95][Issue 12040][broker] Multiple bind addresses for Pulsar protocol (apache#12056) Added Debezium Source for MS SQL Server (apache#12256) Fix: flaky oracle tests (apache#12306) [C++] Use URL encoded content type for OAuth 2.0 authentication (apache#12341) [C++] Handle OAuth 2.0 exceptional cases gracefully (apache#12335) feat(cli): add restart command to pulsar-daemon (apache#12279) [client-tools] Remove redundant initial value (apache#12296) Make AuthenticationTokenTest to run on windows (apache#12329) [offload] fix FileSystemManagedLedgerOffloader can not cleanup outdated ledger data (apache#12309) [Doc]--Update contents for Pulsar adaptor for Apache Spark (apache#12338) [PIP 95][Issue 12040][broker] Improved multi-listener in standalone mode (apache#12066) [website][upgrade]feat: docs migration - Cookbooks (apache#12319) [testclient] Make --payload-file take effect in PerformanceClient (apache#12187) [website][upgrade]feat: docs migration - adaptor (apache#12318) [pulsar-client] Add partition-change api for producer/consumer interceptors (apache#12287) [Transaction]Fix lowWaterMark of TopicTransactionBuffer (apache#12312) [pulsar-admin] New option takes precedence over deprecated option (apache#12260) ... # Conflicts: # site2/website-next/docusaurus.config.js # site2/website-next/versions.json
Motivation
There are usecases for applications, adapters or connectors to know update of number of partitions and handle the change in applications when application is handling individual partitions in the workflow. Therefore, add API in interceptor to notify applications when number of partition changes for the topic.
Modification
void onPartitionsChange(String topicName, int partitions)