Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Able to handling messages with multiple listener threads in order for the Key_Shared subscription. #9329

Merged

Conversation

codelipenghui
Copy link
Contributor

@codelipenghui codelipenghui commented Jan 26, 2021

Motivation

Currently, a consumer is pinged to a given listener thread to ensure the message is processed ordered for a topic. But for the Key_Shared subscription, the message process order is based on the message key, not the order of the topic. So this PR is able to handle messages with multiple listener threads for the Key_Shared subscription which also keeps the order of message key.

Verifying this change

New tests added.

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: (no
  • The default values of configurations: (no)
  • The wire protocol: (no)
  • The rest endpoints: (no)
  • The admin cli options: (no)
  • Anything that affects deployment: (no)

Documentation

  • Does this pull request introduce a new feature? (no)

@codelipenghui codelipenghui self-assigned this Jan 26, 2021
@codelipenghui codelipenghui added the type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages label Jan 26, 2021
@codelipenghui codelipenghui added this to the 2.8.0 milestone Jan 26, 2021
@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

2 similar comments
@hangc0276
Copy link
Contributor

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

Copy link
Member

@sijie sijie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall the change looks good to me. Just one comment about the OrderedExecutor.

@@ -80,6 +83,7 @@
protected volatile long incomingMessagesSize = 0;
protected volatile Timeout batchReceiveTimeout = null;
protected final Lock reentrantLock = new ReentrantLock();
private OrderedExecutor orderedExecutor = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you just change the listenerExecutor to OrderedExecutor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh @sijie, I have checked the details seems currently the listenerExecutor used by some other operations and the listener executor is a ScheduledExecutorService, the consumer will always get a listenerExecutor with 1 thread, so we can't change the listenerExecutor to OrderedExecutor directly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we use OrderedScheduler? BookKeeper common library also provides an OrderedScheduler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have addressed the comment, please take a look.

@codelipenghui codelipenghui force-pushed the penghui/key_shared_listener_ordered branch from 63e7271 to 7a418f3 Compare January 28, 2021 13:23
@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

@sijie
Copy link
Member

sijie commented Feb 11, 2021

/pulsarbot run-failure-checks

@sijie
Copy link
Member

sijie commented Feb 12, 2021

@codelipenghui I think there are shading issues. Can you fix it?

@codelipenghui
Copy link
Contributor Author

@sijie Yes, the interesting thing is I follow the steps of the github workflow, it can get passed on my laptop. And I have verified the pulsar-client and pulsar-client-all locally, also works.

@codelipenghui
Copy link
Contributor Author

@merlimat Sorry, I missed the community meeting. I have updated the description of this PR. This PR is not to try to fix the ordering issue of the consumer under the Key_Shared subscription, this PR is to try to allow a consumer to use multiple listener threads for the Key_Shared subscription and keep the message with the same key processed by the same thread.

@@ -69,7 +70,8 @@
protected final CompletableFuture<Consumer<T>> subscribeFuture;
protected final MessageListener<T> listener;
protected final ConsumerEventListener consumerEventListener;
protected final ExecutorService listenerExecutor;
protected final ExecutorProvider executorProvider;
protected final ScheduledExecutorService pingedExecutor;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear to me what pingedExecutor means here. Does that mean "pinned"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry. pinned

return getExecutor(object == null ? -1 : object.hashCode() & Integer.MAX_VALUE);
}

public ExecutorService getExecutor(int hash) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, from API perspective, it would be better to just expose an Object (or String) for selecting the thread. Couldn't we calculate the hash internally always?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, we can move the key hash calculate internally and replace this method to getExecutor(byte[]).

@codelipenghui
Copy link
Contributor Author

@merlimat I have addressed the comments, PTAL.

@codelipenghui codelipenghui merged commit e4523e0 into apache:master Feb 21, 2021
@codelipenghui codelipenghui deleted the penghui/key_shared_listener_ordered branch February 21, 2021 03:12
lhotari added a commit to lhotari/pulsar that referenced this pull request Feb 22, 2021
- apache#9630 and apache#9329 were merged concurrently which caused the master branch
  to break

- fixes SpotBugs error
"Medium: org.apache.pulsar.client.impl.ConsumerBase.NONE_KEY should be
package protected [org.apache.pulsar.client.impl.ConsumerBase]
At ConsumerBase.java:[line 887] MS_PKGPROTECT"
zymap pushed a commit to zymap/pulsar that referenced this pull request Feb 26, 2021
… the Key_Shared subscription. (apache#9329)

### Motivation

Currently, a consumer is pinged to a given listener thread to ensure the message is processed ordered for a topic. But for the Key_Shared subscription, the message process order is based on the message key, not the order of the topic. So this PR is able to handle messages with multiple listener threads for the Key_Shared subscription which also keeps the order of message key.

(cherry picked from commit e4523e0)
zymap pushed a commit that referenced this pull request Mar 1, 2021
… the Key_Shared subscription. (#9329)

Currently, a consumer is pinged to a given listener thread to ensure the message is processed ordered for a topic. But for the Key_Shared subscription, the message process order is based on the message key, not the order of the topic. So this PR is able to handle messages with multiple listener threads for the Key_Shared subscription which also keeps the order of message key.

(cherry picked from commit e4523e0)
@zymap zymap added the cherry-picked/branch-2.7 Archived: 2.7 is end of life label Mar 1, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cherry-picked/branch-2.7 Archived: 2.7 is end of life release/2.7.1 type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants