-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pulsar-perf: add ability to create partitioned topics #9859
Conversation
/pulsarbot rerun-failure-checks |
try { | ||
client.topics().createPartitionedTopic(topic, arguments.createTopicPartitions); | ||
} catch (PulsarAdminException.ConflictException alreadyExists) { | ||
log.debug("Topic "+topic+" already exists: " + alreadyExists); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the topic already exists, we should check the partitions is equals to the user settings, otherwise, we might get the incorrect test result. I think we should stop the test client in this situdation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea, I will add this validation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@codelipenghui I added the validation, PTAL
thanks for your quick review
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log.debug
should be put in if (log.isDebugEnabled())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sijie done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work! It is better to add a unit test to cover the situation where Topic already exists
@315157973 I cannot do it easily/not in this PR. In order to make these tools "testable" we should run them in another way and add a way to not call "System.exit" I will be happy to follow up with an implementation for #9865 but I believe it is better to keep this PR focused on the task |
/pulsarbot rerun-failure-checks |
5 similar comments
/pulsarbot rerun-failure-checks |
/pulsarbot rerun-failure-checks |
/pulsarbot rerun-failure-checks |
/pulsarbot rerun-failure-checks |
/pulsarbot rerun-failure-checks |
@@ -140,6 +148,9 @@ | |||
@Parameter(names = { "-p", "--max-outstanding-across-partitions" }, description = "Max number of outstanding messages across partitions") | |||
public int maxPendingMessagesAcrossPartitions = DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS; | |||
|
|||
@Parameter(names = { "-np", "--partitions" }, description = "Create partitioned topics with the given number of partitions, set 0 to not try to create the topic") | |||
public int createTopicPartitions = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public int createTopicPartitions = 0; | |
public int partitions = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -140,6 +148,9 @@ | |||
@Parameter(names = { "-p", "--max-outstanding-across-partitions" }, description = "Max number of outstanding messages across partitions") | |||
public int maxPendingMessagesAcrossPartitions = DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS; | |||
|
|||
@Parameter(names = { "-np", "--partitions" }, description = "Create partitioned topics with the given number of partitions, set 0 to not try to create the topic") | |||
public int createTopicPartitions = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public int createTopicPartitions = 0; | |
public Integer createTopicPartitions = null; |
Please use boxed integer to indicate if it is going to create a partitioned topic or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea. done
try { | ||
client.topics().createPartitionedTopic(topic, arguments.createTopicPartitions); | ||
} catch (PulsarAdminException.ConflictException alreadyExists) { | ||
log.debug("Topic "+topic+" already exists: " + alreadyExists); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log.debug
should be put in if (log.isDebugEnabled())
pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
Outdated
Show resolved
Hide resolved
354d89b
to
b68fb62
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sijie I have addressed all of your comments and answered to the System.exit question.
Thanks
try { | ||
client.topics().createPartitionedTopic(topic, arguments.createTopicPartitions); | ||
} catch (PulsarAdminException.ConflictException alreadyExists) { | ||
log.debug("Topic "+topic+" already exists: " + alreadyExists); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sijie done
pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
Outdated
Show resolved
Hide resolved
@@ -140,6 +148,9 @@ | |||
@Parameter(names = { "-p", "--max-outstanding-across-partitions" }, description = "Max number of outstanding messages across partitions") | |||
public int maxPendingMessagesAcrossPartitions = DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS; | |||
|
|||
@Parameter(names = { "-np", "--partitions" }, description = "Create partitioned topics with the given number of partitions, set 0 to not try to create the topic") | |||
public int createTopicPartitions = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea. done
@@ -140,6 +148,9 @@ | |||
@Parameter(names = { "-p", "--max-outstanding-across-partitions" }, description = "Max number of outstanding messages across partitions") | |||
public int maxPendingMessagesAcrossPartitions = DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS; | |||
|
|||
@Parameter(names = { "-np", "--partitions" }, description = "Create partitioned topics with the given number of partitions, set 0 to not try to create the topic") | |||
public int createTopicPartitions = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
/pulsarbot rerun-failure-checks |
3b2b4a2
to
6c53047
Compare
6c53047
to
c932361
Compare
Motivation
When you are using pulsar-perf you have to pre-create the topic if you want it to be partitioned.
This is very awkward and error prone.
Modifications
This patch adds the ability to set a number of partitions.
Before starting the main procedure we are going to create the topics as partitioned topics.
I needed to add support to pass explicitly the http service url, because by default the serverURL is the binary protocol URL and it is not suitable for PulsarAdmin. The default value is http://localhost:8080.
I added support for not calling System.exit and execute a custom callback, otherwise it is not possible to execute reliably the tests without crashing the JVM.
Verifying this change
This change added tests and can be verified bu testing manually the new option.
Documentation