Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add single partition router #999

Merged
merged 10 commits into from
Jun 28, 2023

Conversation

crossoverJie
Copy link
Member

Motivation

image

https://pulsar.apache.org/docs/2.11.x/concepts-messaging/#routing-modes
https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java

Based on the code of the Java client, Implemented the single-partition-router mentioned in the document.

Modifications

Add single-partition-router

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (yes) Add NewSinglePartitionRouter() method.
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (GoDocs)

@shibd
Copy link
Member

shibd commented Jun 15, 2023

@crossoverJie Hi, thanks for your contribution. Can you fix the ci lint failed?

@crossoverJie
Copy link
Member Author

crossoverJie commented Jun 15, 2023

@crossoverJie Hi, thanks for your contribution. Can you fix the ci lint failed?

PTAL

@crossoverJie
Copy link
Member Author

/pulsarbot run-failure-checks

@BewareMyPower
Copy link
Contributor

Please rebase to master to have tests fixed.

@crossoverJie
Copy link
Member Author

Please rebase to master to have tests fixed.

Done

shibd
shibd previously approved these changes Jun 21, 2023
}
mutex.Lock()
defer mutex.Unlock()
partition := r.R.Intn(int(numPartitions))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to judge again here if singlePartition is nil. Set it value only when it is nil.

Copy link
Member Author

@crossoverJie crossoverJie Jun 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would using sync.Once be better?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@shibd shibd self-requested a review June 21, 2023 08:20
@shibd shibd dismissed their stale review June 21, 2023 08:21

Misoperation approved

@nodece
Copy link
Member

nodece commented Jun 25, 2023

Thank you for your contribution! But the default router supports this feature:

if len(message.OrderingKey) != 0 {
// When an OrderingKey is specified, use the hash of that key
return int(hashFunc(message.OrderingKey) % numPartitions)
}
if len(message.Key) != 0 {
// When a key is specified, use the hash of that key
return int(hashFunc(message.Key) % numPartitions)
}

@crossoverJie
Copy link
Member Author

Thank you for your contribution! But the default router supports this feature:

if len(message.OrderingKey) != 0 {
// When an OrderingKey is specified, use the hash of that key
return int(hashFunc(message.OrderingKey) % numPartitions)
}
if len(message.Key) != 0 {
// When a key is specified, use the hash of that key
return int(hashFunc(message.Key) % numPartitions)
}

But when there are no OrderdingKey and Key, the user needs to specify specific partitions manually, here refers to the API of the Java SDK, where the SDK selects the partitions, keeping consistent with the implementation of the Java SDK.

https://github.com/apache/pulsar/blob/46b6dcd9571262f72f963a755c8a51fb4f66c81e/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java#L153C1-L155

import "sync"

var (
singlePartition *int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DO NOT use the global variable.

Please move these variables to line 28.

pulsar/single_partition_router.go Show resolved Hide resolved
pulsar/single_partition_router.go Outdated Show resolved Hide resolved
pulsar/producer_test.go Show resolved Hide resolved
pulsar/producer_test.go Outdated Show resolved Hide resolved
pulsar/single_partition_route_bench_test.go Outdated Show resolved Hide resolved
func NewSinglePartitionRouter() func(*ProducerMessage, TopicMetadata) int {
return func(message *ProducerMessage, metadata TopicMetadata) int {
numPartitions := metadata.NumPartitions()
if len(message.OrderingKey) != 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I notice that Java only checks the message.key, without message.orderingKey, so we should keep the same implementation.

crossoverJie and others added 2 commits June 25, 2023 14:54
Co-authored-by: Zixuan Liu <nodeces@gmail.com>
pulsar/producer_test.go Outdated Show resolved Hide resolved
@crossoverJie
Copy link
Member Author

CI run failed, I created another PR to fix this issue.

@shibd
Copy link
Member

shibd commented Jun 28, 2023

@nodece Hi, Can you take a look at it again?

@shibd shibd merged commit 3a4e5cf into apache:master Jun 28, 2023
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants