Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to enable/disable delayed delivery for messages on namespace #5915

Merged
merged 18 commits into from Jan 21, 2020

Conversation

@wolfstudy
Copy link
Member

wolfstudy commented Dec 23, 2019

Signed-off-by: xiaolong.ran rxl@apache.org

Fixes: #4080

Modifications

  • add set-delayed-delivery policy for namespace
  • add set-delayed-delivery-time policy for namespace
  • add test case
  • update admin cli docs
Signed-off-by: xiaolong.ran <rxl@apache.org>
@wolfstudy wolfstudy self-assigned this Dec 23, 2019
@wolfstudy wolfstudy requested review from sijie, jiazhai and merlimat Dec 23, 2019
@wolfstudy wolfstudy added this to the 2.6.0 milestone Dec 23, 2019
@wolfstudy wolfstudy changed the title Allow to enable/disable delayed delivery for messages on namespace [WIP]Allow to enable/disable delayed delivery for messages on namespace Dec 23, 2019
wolfstudy added 3 commits Dec 23, 2019
Signed-off-by: xiaolong.ran <rxl@apache.org>
Signed-off-by: xiaolong.ran <rxl@apache.org>
Signed-off-by: xiaolong.ran <rxl@apache.org>
@wolfstudy wolfstudy changed the title [WIP]Allow to enable/disable delayed delivery for messages on namespace Allow to enable/disable delayed delivery for messages on namespace Dec 23, 2019
@wolfstudy wolfstudy requested a review from codelipenghui Dec 23, 2019
Signed-off-by: xiaolong.ran <rxl@apache.org>
Copy link
Contributor

codelipenghui left a comment

I think use a single command to enable or disable delay delivery is more convenient.

pulsar-admin namespaces set-delayed-delivery --enable --tick_time_mills 2000 public/default
pulsar-admin namespaces set-delayed-delivery --disable public/default

And we need to add method for get delayed-delivery

pulsar-admin namespaces get-delayed-delivery public/default
{
"enable": true,
"tick_time_mills": 2000
}

BTW, how do we handle exists messages already push to delayed message tracker when disable delayed delivery, send them to consumer immediately?

@@ -64,6 +64,10 @@
@SuppressWarnings("checkstyle:MemberName")
public boolean encryption_required = false;
@SuppressWarnings("checkstyle:MemberName")
public boolean delayed_delivery = true;
@SuppressWarnings("checkstyle:MemberName")
public long delayed_delivery_time = 0;

This comment has been minimized.

Copy link
@codelipenghui

codelipenghui Dec 24, 2019

Contributor

It should be delayed_delivery_tick_time

wolfstudy added 3 commits Dec 26, 2019
Signed-off-by: xiaolong.ran <rxl@apache.org>
Signed-off-by: xiaolong.ran <rxl@apache.org>
Signed-off-by: xiaolong.ran <rxl@apache.org>
@wolfstudy

This comment has been minimized.

Copy link
Member Author

wolfstudy commented Dec 26, 2019

@codelipenghui PTAL again, thanks.

Get

$ ./bin/pulsar-admin namespaces get-delayed-delivery public/default

{
  "tickTime" : 1000,
  "active" : true
}

Set

$ ./bin/pulsar-admin namespaces set-delayed-delivery public/default --enable --time 2s
wolfstudy added 2 commits Dec 26, 2019
Signed-off-by: xiaolong.ran <rxl@apache.org>
Signed-off-by: xiaolong.ran <rxl@apache.org>
@wolfstudy

This comment has been minimized.

Copy link
Member Author

wolfstudy commented Dec 27, 2019

ping @codelipenghui PTAL again, thanks.

@sijie

This comment has been minimized.

Copy link
Member

sijie commented Jan 1, 2020

@codelipenghui can you help review this pull request again?

* "tickTime" : 1000, // Enable or disable delayed delivery for messages on a namespace
* "active" : true, // The tick time for when retrying on delayed delivery messages
Comment on lines +1198 to +1199

This comment has been minimized.

Copy link
@congbobo184

congbobo184 Jan 3, 2020

Contributor

are the two notes reversed?

Signed-off-by: xiaolong.ran <rxl@apache.org>
@sijie

This comment has been minimized.

Copy link
Member

sijie commented Jan 6, 2020

@codelipenghui can you review this change?

@@ -718,7 +718,7 @@ public void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {

@Override
public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata msgMetadata) {
if (!isDelayedDeliveryEnabled) {
if (!isDelayedDeliveryEnabled || !topic.checkActiveDelayedDelivery()) {

This comment has been minimized.

Copy link
@codelipenghui

codelipenghui Jan 6, 2020

Contributor

The isDelayedDeliveryEnabled in the dispatcher is for indicating the dispatcher is enable or disable delayed delivery. It's better to update this field while the delayed delivery policy changed because it's more economic than get policy from a hashmap for each message.

@@ -730,6 +730,7 @@ public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata
.of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this));
}

delayedDeliveryTracker.get().resetTickTime(topic.checkTickTimeDelayedDelivery());

This comment has been minimized.

Copy link
@codelipenghui

codelipenghui Jan 6, 2020

Contributor

Same as above comment, reset tick time when the namespace policy changed is more economic than check need to reset for every message

Signed-off-by: xiaolong.ran <rxl@apache.org>
@wolfstudy

This comment has been minimized.

Copy link
Member Author

wolfstudy commented Jan 19, 2020

Sorry, i missed the message.

@codelipenghui fixed done, please take a look again, thanks.

Signed-off-by: xiaolong.ran <rxl@apache.org>
@@ -718,7 +718,7 @@ public void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {

@Override
public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata msgMetadata) {
if (!isDelayedDeliveryEnabled) {
if (!isDelayedDeliveryEnabled || !topic.delayedDeliveryEnabled) {

This comment has been minimized.

Copy link
@codelipenghui

codelipenghui Jan 20, 2020

Contributor

I think you can remove the isDelayedDeliveryEnabled in dispatcher, maybe we just need it on topic level on subscription level



Comment on lines 302 to 303

This comment has been minimized.

Copy link
@codelipenghui

codelipenghui Jan 20, 2020

Contributor

remove these 2 lines

Copy link
Contributor

codelipenghui left a comment

Looks good to me, just left some minor comments, please take a look

Signed-off-by: xiaolong.ran <rxl@apache.org>
Signed-off-by: xiaolong.ran <rxl@apache.org>
@wolfstudy

This comment has been minimized.

Copy link
Member Author

wolfstudy commented Jan 20, 2020

retest this please

wolfstudy added 3 commits Jan 20, 2020
Signed-off-by: xiaolong.ran <rxl@apache.org>
Signed-off-by: xiaolong.ran <rxl@apache.org>
@wolfstudy

This comment has been minimized.

Copy link
Member Author

wolfstudy commented Jan 21, 2020

run java8 tests
run cpp tests

@wolfstudy

This comment has been minimized.

Copy link
Member Author

wolfstudy commented Jan 21, 2020

run java8 tests

@wolfstudy wolfstudy merged commit f0d339e into apache:master Jan 21, 2020
16 of 22 checks passed
16 of 22 checks passed
cpp-tests cpp-tests
Details
backwards-compatibility backwards-compatibility
Details
process process
Details
thread thread
Details
unit-test-flaky unit-test-flaky
Details
unit-tests unit-tests
Details
cli
Details
function-state
Details
messaging
Details
schema
Details
sql
Details
standalone
Details
tiered-filesystem
Details
tiered-jcloud
Details
License check
Details
unit-tests
Details
unit-tests
Details
unit-tests
Details
unit-tests
Details
Jenkins: C++ / Python Tests SUCCESS
Details
Jenkins: Integration Tests SUCCESS
Details
Jenkins: Java 8 - Unit Tests SUCCESS
Details
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Linked issues

Successfully merging this pull request may close these issues.

4 participants
You can’t perform that action at this time.