Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dispatch messages to the new consumer after exist consumer acknowledged all dispatched messages in Key_Shared Subscription. #6554

Closed
codelipenghui opened this issue Mar 19, 2020 · 18 comments · Fixed by #7106
Assignees
Labels
area/broker release/2.6.2 type/bug The PR fixed a bug or issue reported a bug
Milestone

Comments

@codelipenghui
Copy link
Contributor

codelipenghui commented Mar 19, 2020

Master issue #4077
Is your feature request related to a problem? Please describe.
Dispatch messages to the new consumer after exist consumer acknowledged all dispatched messages in Key_Shared Subscription.

@codelipenghui codelipenghui added the type/feature The PR added a new feature or issue requested a new feature label Mar 19, 2020
@codelipenghui codelipenghui changed the title Dispatch messages to the new consumer after exist consumer acknowledged all dispatched messages Dispatch messages to the new consumer after exist consumer acknowledged all dispatched messages in Key_Shared Subscription. Mar 19, 2020
@feeblefakie
Copy link

Let me add a related issue and some discussion I and @codelipenghui had about this.

I think sticky consumers won't really solve it since consumers can be easily down in failure cases and it is pretty hard for applications to manage scalability of consumers.

@codelipenghui codelipenghui self-assigned this Apr 3, 2020
@poulhenriksen
Copy link

How come this is marked as a feature and not a bug?

It violates the ordering guarantee of Key_Shared, and basically makes Key_Shared useless if you dynamically add consumers.

I ask, because I would like to ensure this bug gets the proper attention, and is not regarded as a nice-to-have feature.

@feeblefakie
Copy link

@codelipenghui Could you share updates on this if any ?
Sorry for pushing, but I feel it is a really critical bug and Key_Shared users will have so much troubles if they just use it without doubt.

@codelipenghui
Copy link
Contributor Author

codelipenghui commented Apr 14, 2020

@feeblefakie We are already advancing this matter and I have drafted an approach. I will create a PIP or push a PR soon.

@codelipenghui codelipenghui added this to the 2.6.0 milestone Apr 14, 2020
@feeblefakie
Copy link

@codelipenghui Thank you for sharing the update ! Good to know. :)

@merlimat merlimat added type/bug The PR fixed a bug or issue reported a bug and removed type/feature The PR added a new feature or issue requested a new feature labels Apr 23, 2020
@merlimat
Copy link
Contributor

So, I think the problem here should be treated by having new consumers joining in a "paused" state.

For example, consider this sequence:

  1. Subscriptions has c1 and c2 consumers
  2. c3 joins. Some of the keys are now supposed to go to c3.
  3. Instead of starting delivering to c3. We mark the current readPosition (let's call it rp0_c3) of the cursor for c3.
  4. Any message that now hashes to c3 and that has messageId >= rp0_c3 will be deferred for later re-delivery
  5. Any message that might get re-delivered (eg: originally sent to c1, but c1 has failed) to c3 and that has messageId < rp0_c3 will be sent to c3
  6. When the markDelete position of the cursor will move past rp0_c3 the restriction on c3 will be lifted.

Essentially, c3 joins but can only receive old messages, until everything that was read before joining gets acked.

@feeblefakie
Copy link

@merlimat
I'm not fully sure about the solution but does it guarantee key ordering ?
For example, a message X with key "A" is first consumed in c1, and another message Y with key "A" can possibly go to c3 ? (can we make sure message X is consumed and acked before message Y ?)

@merlimat
Copy link
Contributor

For example, a message X with key "A" is first consumed in c1, and another message Y with key "A" can possibly go to c3 ? (can we make sure message X is consumed and acked before message Y ?)

Yes, the above proposal takes care of that.

The important point is that X could be re-sent to c3, in case c1 fails just after c3 has joined.

@codelipenghui
Copy link
Contributor Author

@merlimat I like the approach you have mentioned.

@codelipenghui
Copy link
Contributor Author

codelipenghui commented May 19, 2020

@pouledodue @feeblefakie @merlimat I have pushed a PR #6977 for this issue. Please help review, thanks.

codelipenghui pushed a commit that referenced this issue Jun 5, 2020
)

### Motivation

Fixes:  #6554

Ordering is broken in KeyShared dispatcher if a new consumer `c2` comes in and an existing consumer `c1` goes out. 

This is because messages with keys previously assigned to `c1` may now route to `c2`. 

The solution here is to have new consumers joining in a "paused" state.

For example, consider this sequence:

 1. Subscriptions has `c1` and `c2` consumers
 2. `c3` joins. Some of the keys are now supposed to go to `c3`.
 3. Instead of starting delivering to `c3`. We mark the current readPosition (let's call it `rp0_c3`) of the cursor for `c3`.
 4. Any message that now hashes to `c3` and that has `messageId >= rp0_c3` will be deferred for later re-delivery
 5. Any message that might get re-delivered (eg: originally sent to `c1`, but `c1` has failed) to `c3` and that has `messageId < rp0_c3` will be sent to `c3`
 6. When the markDelete position of the cursor will move past `rp0_c3` the restriction on `c3` will be lifted.

Essentially, `c3` joins but can only receive old messages, until everything that was read before joining gets acked.
@codelipenghui codelipenghui reopened this Jun 16, 2020
@codelipenghui
Copy link
Contributor Author

Reopen it via #5819 (comment).

@rocketraman
Copy link
Member

PR #7106 referred to by #5819 (comment) is marked as closed and merged, with milestone 2.6.0. Should this issue be closed with 2.6.0 milestone after all?

@codelipenghui
Copy link
Contributor Author

@rocketraman As discussed with @feeblefakie in slack channel, looks there are some problems with auto split mechanism. So I reopen this issue for tracking this problem. And @feeblefakie has verified consistent hash mechanism is work as expected. I will take a look at the auto split mechanism later.

@feeblefakie
Copy link

feeblefakie commented Jul 3, 2020

@codelipenghui Should I create an open issue for this ?
I realized that I just created how to reproduce doc in my repo and shared it in only the closed issue.

@codelipenghui
Copy link
Contributor Author

@feeblefakie Yes, thanks.

@feeblefakie
Copy link

@codelipenghui #7455 PTAL.

cdbartholomew pushed a commit to kafkaesque-io/pulsar that referenced this issue Jul 24, 2020
…ache#7106)

### Motivation

Fixes:  apache#6554

Ordering is broken in KeyShared dispatcher if a new consumer `c2` comes in and an existing consumer `c1` goes out. 

This is because messages with keys previously assigned to `c1` may now route to `c2`. 

The solution here is to have new consumers joining in a "paused" state.

For example, consider this sequence:

 1. Subscriptions has `c1` and `c2` consumers
 2. `c3` joins. Some of the keys are now supposed to go to `c3`.
 3. Instead of starting delivering to `c3`. We mark the current readPosition (let's call it `rp0_c3`) of the cursor for `c3`.
 4. Any message that now hashes to `c3` and that has `messageId >= rp0_c3` will be deferred for later re-delivery
 5. Any message that might get re-delivered (eg: originally sent to `c1`, but `c1` has failed) to `c3` and that has `messageId < rp0_c3` will be sent to `c3`
 6. When the markDelete position of the cursor will move past `rp0_c3` the restriction on `c3` will be lifted.

Essentially, `c3` joins but can only receive old messages, until everything that was read before joining gets acked.
@wolfstudy
Copy link
Member

The issue will be tracked in #7455 so move the issue to release/2.6.2

huangdx0726 pushed a commit to huangdx0726/pulsar that referenced this issue Aug 24, 2020
…ache#7106)

### Motivation

Fixes:  apache#6554

Ordering is broken in KeyShared dispatcher if a new consumer `c2` comes in and an existing consumer `c1` goes out. 

This is because messages with keys previously assigned to `c1` may now route to `c2`. 

The solution here is to have new consumers joining in a "paused" state.

For example, consider this sequence:

 1. Subscriptions has `c1` and `c2` consumers
 2. `c3` joins. Some of the keys are now supposed to go to `c3`.
 3. Instead of starting delivering to `c3`. We mark the current readPosition (let's call it `rp0_c3`) of the cursor for `c3`.
 4. Any message that now hashes to `c3` and that has `messageId >= rp0_c3` will be deferred for later re-delivery
 5. Any message that might get re-delivered (eg: originally sent to `c1`, but `c1` has failed) to `c3` and that has `messageId < rp0_c3` will be sent to `c3`
 6. When the markDelete position of the cursor will move past `rp0_c3` the restriction on `c3` will be lifted.

Essentially, `c3` joins but can only receive old messages, until everything that was read before joining gets acked.
@wolfstudy
Copy link
Member

It seems that #8292 has solved this problem, let us consider closing this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment