From f001b281865e3f9d84a8757609a52a3d55f9ff36 Mon Sep 17 00:00:00 2001 From: yuruguo Date: Thu, 9 Sep 2021 00:23:03 +0800 Subject: [PATCH 1/3] [testclient] Improve parameter checking in perf --- .../pulsar/client/impl/ClientBuilderImpl.java | 5 +++ .../client/impl/ProducerBuilderImpl.java | 3 ++ .../pulsar/client/impl/ReaderBuilderImpl.java | 1 + .../socket/client/PerformanceClient.java | 7 ++-- .../testclient/ManagedLedgerWriter.java | 8 ++--- .../pulsar/testclient/ParameterValidator.java | 33 +++++++++++++++++++ .../testclient/PerformanceConsumer.java | 9 ++--- .../testclient/PerformanceProducer.java | 13 ++++---- .../pulsar/testclient/PerformanceReader.java | 4 +-- 9 files changed, 64 insertions(+), 19 deletions(-) create mode 100644 pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ParameterValidator.java diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java index c8a4376f5ab65..0eca13e5af87a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java @@ -37,6 +37,8 @@ import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; +import static com.google.common.base.Preconditions.checkArgument; + public class ClientBuilderImpl implements ClientBuilder { ClientConfigurationData conf; @@ -152,6 +154,7 @@ public ClientBuilder lookupTimeout(int lookupTimeout, TimeUnit unit) { @Override public ClientBuilder ioThreads(int numIoThreads) { + checkArgument(numIoThreads > 0, "ioThreads needs to be > 0"); conf.setNumIoThreads(numIoThreads); return this; } @@ -164,6 +167,7 @@ public ClientBuilder listenerThreads(int numListenerThreads) { @Override public ClientBuilder connectionsPerBroker(int connectionsPerBroker) { + checkArgument(connectionsPerBroker >= 0, "connectionsPerBroker needs to be >= 0"); conf.setConnectionsPerBroker(connectionsPerBroker); return this; } @@ -242,6 +246,7 @@ public ClientBuilder tlsProtocols(Set tlsProtocols) { @Override public ClientBuilder statsInterval(long statsInterval, TimeUnit unit) { + checkArgument(statsInterval >= 0, "statsInterval needs to be >= 0"); conf.setStatsIntervalSeconds(unit.toSeconds(statsInterval)); return this; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java index d3a1bb13038fc..dc3d683890722 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java @@ -135,18 +135,21 @@ public ProducerBuilder producerName(String producerName) { @Override public ProducerBuilder sendTimeout(int sendTimeout, @NonNull TimeUnit unit) { + checkArgument(sendTimeout >= 0, "sendTimeout needs to be >= 0"); conf.setSendTimeoutMs(sendTimeout, unit); return this; } @Override public ProducerBuilder maxPendingMessages(int maxPendingMessages) { + checkArgument(maxPendingMessages >= 0, "maxPendingMessages needs to be >= 0"); conf.setMaxPendingMessages(maxPendingMessages); return this; } @Override public ProducerBuilder maxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) { + checkArgument(maxPendingMessagesAcrossPartitions >= 0, "maxPendingMessagesAcrossPartitions needs to be >= 0"); conf.setMaxPendingMessagesAcrossPartitions(maxPendingMessagesAcrossPartitions); return this; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java index 4305bb6a45ed9..dfcba2f922b18 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java @@ -174,6 +174,7 @@ public ReaderBuilder cryptoFailureAction(ConsumerCryptoFailureAction action) @Override public ReaderBuilder receiverQueueSize(int receiverQueueSize) { + checkArgument(receiverQueueSize >= 0, "receiverQueueSize needs to be >= 0"); conf.setReceiverQueueSize(receiverQueueSize); return this; } diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java index 3902d5fbb2ea2..5f5c0f1b729ea 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java @@ -47,6 +47,7 @@ import org.apache.pulsar.client.api.AuthenticationDataProvider; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.testclient.ParameterValidator; import org.apache.pulsar.testclient.PerfClientUtils; import org.apache.pulsar.testclient.utils.PaddingDecimalFormat; import org.slf4j.Logger; @@ -90,7 +91,7 @@ static class Arguments { @Parameter(names = { "-s", "--size" }, description = "Message size in byte") public int msgSize = 1024; - @Parameter(names = { "-t", "--num-topic" }, description = "Number of topics") + @Parameter(names = { "-t", "--num-topic" }, description = "Number of topics", validateWith = ParameterValidator.class) public int numTopics = 1; @Parameter(names = { "--auth_plugin" }, description = "Authentication plugin class name") @@ -104,14 +105,14 @@ static class Arguments { public String authParams; @Parameter(names = { "-m", - "--num-messages" }, description = "Number of messages to publish in total. If 0, it will keep publishing") + "--num-messages" }, description = "Number of messages to publish in total. If <= 0, it will keep publishing") public long numMessages = 0; @Parameter(names = { "-f", "--payload-file" }, description = "Use payload from a file instead of empty buffer") public String payloadFilename = null; @Parameter(names = { "-time", - "--test-duration" }, description = "Test duration in secs. If 0, it will keep publishing") + "--test-duration" }, description = "Test duration in secs. If <= 0, it will keep publishing") public long testTime = 0; } diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java index 1520c92013639..5e4a4ddca20c7 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java @@ -93,10 +93,10 @@ static class Arguments { @Parameter(names = { "-s", "--size" }, description = "Message size") public int msgSize = 1024; - @Parameter(names = { "-t", "--num-topic" }, description = "Number of managed ledgers") + @Parameter(names = { "-t", "--num-topic" }, description = "Number of managed ledgers", validateWith = ParameterValidator.class) public int numManagedLedgers = 1; - @Parameter(names = { "--threads" }, description = "Number of threads writing") + @Parameter(names = { "--threads" }, description = "Number of threads writing", validateWith = ParameterValidator.class) public int numThreads = 1; @Parameter(names = { "-zk", "--zookeeperServers" }, description = "ZooKeeper connection string", required = true) @@ -110,7 +110,7 @@ static class Arguments { public int maxConnections = 1; @Parameter(names = { "-m", - "--num-messages" }, description = "Number of messages to publish in total. If 0, it will keep publishing") + "--num-messages" }, description = "Number of messages to publish in total. If <= 0, it will keep publishing") public long numMessages = 0; @Parameter(names = { "-e", "--ensemble-size" }, description = "Ledger ensemble size") @@ -126,7 +126,7 @@ static class Arguments { public DigestType digestType = DigestType.CRC32C; @Parameter(names = { "-time", - "--test-duration" }, description = "Test duration in secs. If 0, it will keep publishing") + "--test-duration" }, description = "Test duration in secs. If <= 0, it will keep publishing") public long testTime = 0; } diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ParameterValidator.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ParameterValidator.java new file mode 100644 index 0000000000000..8a9bea583deec --- /dev/null +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ParameterValidator.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.testclient; + +import com.beust.jcommander.IParameterValidator; +import com.beust.jcommander.ParameterException; + +public class ParameterValidator implements IParameterValidator { + + @Override + public void validate(String name, String value) throws ParameterException { + if (Integer.parseInt(value) <= 0) { + throw new ParameterException("Parameter " + name + " should be > 0 (found " + value + ")"); + } + return; + } +} diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java index 32bc8b3c00fc0..5e1dc3cc0529e 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java @@ -21,6 +21,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; +import com.beust.jcommander.IParameterValidator; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.beust.jcommander.ParameterException; @@ -82,13 +83,13 @@ static class Arguments { @Parameter(description = "persistent://prop/ns/my-topic", required = true) public List topic; - @Parameter(names = { "-t", "--num-topics" }, description = "Number of topics") + @Parameter(names = { "-t", "--num-topics" }, description = "Number of topics", validateWith = ParameterValidator.class) public int numTopics = 1; - @Parameter(names = { "-n", "--num-consumers" }, description = "Number of consumers (per subscription), only one consumer is allowed when subscriptionType is Exclusive") + @Parameter(names = { "-n", "--num-consumers" }, description = "Number of consumers (per subscription), only one consumer is allowed when subscriptionType is Exclusive", validateWith = ParameterValidator.class) public int numConsumers = 1; - @Parameter(names = { "-ns", "--num-subscriptions" }, description = "Number of subscriptions (per topic)") + @Parameter(names = { "-ns", "--num-subscriptions" }, description = "Number of subscriptions (per topic)", validateWith = ParameterValidator.class) public int numSubscriptions = 1; @Parameter(names = { "-s", "--subscriber-name" }, description = "Subscriber name prefix", hidden = true) @@ -166,7 +167,7 @@ static class Arguments { public String encKeyFile = null; @Parameter(names = { "-time", - "--test-duration" }, description = "Test duration in secs. If 0, it will keep consuming") + "--test-duration" }, description = "Test duration in secs. If <= 0, it will keep consuming") public long testTime = 0; @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be " + diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java index f198a2dc863f8..c29747796becb 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java @@ -22,6 +22,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; +import com.beust.jcommander.IParameterValidator; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.beust.jcommander.ParameterException; @@ -108,7 +109,7 @@ static class Arguments { @Parameter(description = "persistent://prop/ns/my-topic", required = true) public List topics; - @Parameter(names = { "-threads", "--num-test-threads" }, description = "Number of test threads") + @Parameter(names = { "-threads", "--num-test-threads" }, description = "Number of test threads", validateWith = ParameterValidator.class) public int numTestThreads = 1; @Parameter(names = { "-r", "--rate" }, description = "Publish rate msg/s across topics") @@ -117,10 +118,10 @@ static class Arguments { @Parameter(names = { "-s", "--size" }, description = "Message size (bytes)") public int msgSize = 1024; - @Parameter(names = { "-t", "--num-topic" }, description = "Number of topics") + @Parameter(names = { "-t", "--num-topic" }, description = "Number of topics", validateWith = ParameterValidator.class) public int numTopics = 1; - @Parameter(names = { "-n", "--num-producers" }, description = "Number of producers (per topic)") + @Parameter(names = { "-n", "--num-producers" }, description = "Number of producers (per topic)", validateWith = ParameterValidator.class) public int numProducers = 1; @Parameter(names = {"--separator"}, description = "Separator between the topic and topic number") @@ -169,7 +170,7 @@ static class Arguments { public int maxConnections = 100; @Parameter(names = { "-m", - "--num-messages" }, description = "Number of messages to publish in total. If 0, it will keep publishing") + "--num-messages" }, description = "Number of messages to publish in total. If <= 0, it will keep publishing") public long numMessages = 0; @Parameter(names = { "-i", @@ -201,7 +202,7 @@ static class Arguments { public int batchMaxBytes = 4 * 1024 * 1024; @Parameter(names = { "-time", - "--test-duration" }, description = "Test duration in secs. If 0, it will keep publishing") + "--test-duration" }, description = "Test duration in secs. If <= 0, it will keep publishing") public long testTime = 0; @Parameter(names = "--warmup-time", description = "Warm-up time in seconds (Default: 1 sec)") @@ -538,7 +539,7 @@ private static void runProducer(int producerId, producerBuilder.producerName(producerName); } - if (arguments.batchTimeMillis == 0.0 && arguments.batchMaxMessages == 0) { + if (arguments.batchTimeMillis <= 0.0 && arguments.batchMaxMessages <= 0) { producerBuilder.enableBatching(false); } else { long batchTimeUsec = (long) (arguments.batchTimeMillis * 1000); diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java index c4becf0a93f0e..d7a5dcd18da91 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java @@ -72,7 +72,7 @@ static class Arguments { @Parameter(description = "persistent://prop/ns/my-topic", required = true) public List topic; - @Parameter(names = { "-t", "--num-topics" }, description = "Number of topics") + @Parameter(names = { "-t", "--num-topics" }, description = "Number of topics", validateWith = ParameterValidator.class) public int numTopics = 1; @Parameter(names = { "-r", "--rate" }, description = "Simulate a slow message reader (rate in msg/s)") @@ -122,7 +122,7 @@ static class Arguments { public Boolean tlsAllowInsecureConnection = null; @Parameter(names = { "-time", - "--test-duration" }, description = "Test duration in secs. If 0, it will keep consuming") + "--test-duration" }, description = "Test duration in secs. If <= 0, it will keep consuming") public long testTime = 0; @Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be " + From 55d67e9fbcd8a104c700c821a0233dce3cf93c50 Mon Sep 17 00:00:00 2001 From: yuruguo Date: Thu, 9 Sep 2021 01:15:49 +0800 Subject: [PATCH 2/3] Update doc --- site2/docs/reference-cli-tools.md | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/site2/docs/reference-cli-tools.md b/site2/docs/reference-cli-tools.md index d4ba2c59b4b07..5c7f67b23ad34 100644 --- a/site2/docs/reference-cli-tools.md +++ b/site2/docs/reference-cli-tools.md @@ -448,6 +448,7 @@ Options |`-ss`, `--subscriptions`|A list of subscriptions to consume on (e.g. sub1,sub2)|sub| |`-st`, `--subscription-type`|Subscriber type. Possible values are Exclusive, Shared, Failover, Key_Shared.|Exclusive| |`-sp`, `--subscription-position`|Subscriber position. Possible values are Latest, Earliest.|Latest| +|`-time`, `--test-duration`|Test duration in secs. If <= 0, it will keep consuming|0| |`--trust-cert-file`|Path for the trusted TLS certificate file|| |`--tls-allow-insecure`|Allow insecure TLS connection|| @@ -477,7 +478,7 @@ Options |`-c`, `--max-connections`|Max number of TCP connections to a single broker|100| |`-o`, `--max-outstanding`|Max number of outstanding messages|1000| |`-p`, `--max-outstanding-across-partitions`|Max number of outstanding messages across partitions|50000| -|`-m`, `--num-messages`|Number of messages to publish in total. If set to 0, it will keep publishing.|0| +|`-m`, `--num-messages`|Number of messages to publish in total. If <= 0, it will keep publishing.|0| |`-n`, `--num-producers`|The number of producers (per topic)|1| |`-t`, `--num-topic`|The number of topics|1| |`-f`, `--payload-file`|Use payload from an UTF-8 encoded text file and a payload will be randomly selected when publishing messages|| @@ -486,7 +487,7 @@ Options |`-u`, `--service-url`|Pulsar service URL|| |`-s`, `--size`|Message size (in bytes)|1024| |`-i`, `--stats-interval-seconds`|Statistics interval seconds. If 0, statistics will be disabled.|0| -|`-time`, `--test-duration`|Test duration in secs. If set to 0, it will keep publishing.|0| +|`-time`, `--test-duration`|Test duration in secs. If <= 0, it will keep publishing.|0| |`--trust-cert-file`|Path for the trusted TLS certificate file|| |`--warmup-time`|Warm-up time in seconds|1| |`--tls-allow-insecure`|Allow insecure TLS connection|| @@ -515,6 +516,7 @@ Options |`-u`, `--service-url`|Pulsar service URL|| |`-m`, `--start-message-id`|Start message id. This can be either 'earliest', 'latest' or a specific message id by using 'lid:eid'|earliest| |`-i`, `--stats-interval-seconds`|Statistics interval seconds. If 0, statistics will be disabled.|0| +|`-time`, `--test-duration`|Test duration in secs. If <= 0, it will keep consuming|0| |`--trust-cert-file`|Path for the trusted TLS certificate file|| |`--use-tls`|Use TLS encryption on the connection|false| |`--tls-allow-insecure`|Allow insecure TLS connection|| @@ -534,13 +536,13 @@ Options |`--auth_plugin`|Authentication plugin class name|| |`--conf-file`|Configuration file|| |`-h`, `--help`|Help message|false| -|`-m`, `--num-messages`|Number of messages to publish in total. If 0, it will keep publishing|0| +|`-m`, `--num-messages`|Number of messages to publish in total. If <= 0, it will keep publishing|0| |`-t`, `--num-topic`|The number of topics|1| |`-f`, `--payload-file`|Use payload from a file instead of empty buffer|| |`-u`, `--proxy-url`|Pulsar Proxy URL, e.g., "ws://localhost:8080/"|| |`-r`, `--rate`|Publish rate msg/s across topics|100| |`-s`, `--size`|Message size in byte|1024| -|`-time`, `--test-duration`|Test duration in secs. If 0, it will keep publishing|0| +|`-time`, `--test-duration`|Test duration in secs. If <= 0, it will keep publishing|0| ### `managed-ledger` @@ -560,11 +562,11 @@ Options |`-h`, `--help`|Help message|false| |`-c`, `--max-connections`|Max number of TCP connections to a single bookie|1| |`-o`, `--max-outstanding`|Max number of outstanding requests|1000| -|`-m`, `--num-messages`|Number of messages to publish in total. If 0, it will keep publishing|0| +|`-m`, `--num-messages`|Number of messages to publish in total. If <= 0, it will keep publishing|0| |`-t`, `--num-topic`|Number of managed ledgers|1| |`-r`, `--rate`|Write rate msg/s across managed ledgers|100| |`-s`, `--size`|Message size in byte|1024| -|`-time`, `--test-duration`|Test duration in secs. If 0, it will keep publishing|0| +|`-time`, `--test-duration`|Test duration in secs. If <= 0, it will keep publishing|0| |`--threads`|Number of threads writing|1| |`-w`, `--write-quorum`|Ledger write quorum|1| |`-zk`, `--zookeeperServers`|ZooKeeper connection string|| From 6d4aced9cb8668d51709d61d6049ff8e83ebeb66 Mon Sep 17 00:00:00 2001 From: yuruguo Date: Thu, 9 Sep 2021 19:49:20 +0800 Subject: [PATCH 3/3] [testclient] Improve parameter checking in pulsar-perf --- .../apache/pulsar/client/api/ClientBuilder.java | 2 +- .../pulsar/client/impl/ClientBuilderImpl.java | 3 ++- .../pulsar/client/impl/ProducerBuilderImpl.java | 3 --- .../impl/conf/ClientConfigurationData.java | 1 + .../proxy/socket/client/PerformanceClient.java | 4 ++-- .../pulsar/testclient/ManagedLedgerWriter.java | 4 ++-- .../pulsar/testclient/PerformanceConsumer.java | 7 +++---- .../pulsar/testclient/PerformanceProducer.java | 7 +++---- .../pulsar/testclient/PerformanceReader.java | 2 +- ...ava => PositiveNumberParameterValidator.java} | 3 +-- site2/docs/reference-cli-tools.md | 16 ++++++++-------- 11 files changed, 24 insertions(+), 28 deletions(-) rename pulsar-testclient/src/main/java/org/apache/pulsar/testclient/{ParameterValidator.java => PositiveNumberParameterValidator.java} (93%) diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java index 42a63d37711b1..690dfc5793666 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java @@ -263,7 +263,7 @@ ClientBuilder authentication(String authPluginClassName, Map aut * Increasing this parameter may improve throughput when using many producers over a high latency connection. * * @param connectionsPerBroker - * max number of connections per broker (needs to be greater than 0) + * max number of connections per broker (needs to be greater than or equal to 0) * @return the client builder instance */ ClientBuilder connectionsPerBroker(int connectionsPerBroker); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java index 0eca13e5af87a..2a0542eb4f5b4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java @@ -142,6 +142,7 @@ public ClientBuilder authentication(String authPluginClassName, Map= 0, "operationTimeout needs to be >= 0"); conf.setOperationTimeoutMs(unit.toMillis(operationTimeout)); return this; } @@ -161,6 +162,7 @@ public ClientBuilder ioThreads(int numIoThreads) { @Override public ClientBuilder listenerThreads(int numListenerThreads) { + checkArgument(numListenerThreads > 0, "listenerThreads needs to be > 0"); conf.setNumListenerThreads(numListenerThreads); return this; } @@ -246,7 +248,6 @@ public ClientBuilder tlsProtocols(Set tlsProtocols) { @Override public ClientBuilder statsInterval(long statsInterval, TimeUnit unit) { - checkArgument(statsInterval >= 0, "statsInterval needs to be >= 0"); conf.setStatsIntervalSeconds(unit.toSeconds(statsInterval)); return this; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java index dc3d683890722..d3a1bb13038fc 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java @@ -135,21 +135,18 @@ public ProducerBuilder producerName(String producerName) { @Override public ProducerBuilder sendTimeout(int sendTimeout, @NonNull TimeUnit unit) { - checkArgument(sendTimeout >= 0, "sendTimeout needs to be >= 0"); conf.setSendTimeoutMs(sendTimeout, unit); return this; } @Override public ProducerBuilder maxPendingMessages(int maxPendingMessages) { - checkArgument(maxPendingMessages >= 0, "maxPendingMessages needs to be >= 0"); conf.setMaxPendingMessages(maxPendingMessages); return this; } @Override public ProducerBuilder maxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) { - checkArgument(maxPendingMessagesAcrossPartitions >= 0, "maxPendingMessagesAcrossPartitions needs to be >= 0"); conf.setMaxPendingMessagesAcrossPartitions(maxPendingMessagesAcrossPartitions); return this; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java index 60b3370fed8f7..799e61c86f12a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java @@ -122,6 +122,7 @@ public class ClientConfigurationData implements Serializable, Cloneable { @ApiModelProperty( name = "connectionsPerBroker", value = "Number of connections established between the client and each Broker." + + "A value of 0 means to disable connection pooling." ) private int connectionsPerBroker = 1; diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java index 5f5c0f1b729ea..468aa52222138 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java @@ -47,7 +47,7 @@ import org.apache.pulsar.client.api.AuthenticationDataProvider; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.testclient.ParameterValidator; +import org.apache.pulsar.testclient.PositiveNumberParameterValidator; import org.apache.pulsar.testclient.PerfClientUtils; import org.apache.pulsar.testclient.utils.PaddingDecimalFormat; import org.slf4j.Logger; @@ -91,7 +91,7 @@ static class Arguments { @Parameter(names = { "-s", "--size" }, description = "Message size in byte") public int msgSize = 1024; - @Parameter(names = { "-t", "--num-topic" }, description = "Number of topics", validateWith = ParameterValidator.class) + @Parameter(names = { "-t", "--num-topic" }, description = "Number of topics", validateWith = PositiveNumberParameterValidator.class) public int numTopics = 1; @Parameter(names = { "--auth_plugin" }, description = "Authentication plugin class name") diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java index 5e4a4ddca20c7..807cb13862998 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java @@ -93,10 +93,10 @@ static class Arguments { @Parameter(names = { "-s", "--size" }, description = "Message size") public int msgSize = 1024; - @Parameter(names = { "-t", "--num-topic" }, description = "Number of managed ledgers", validateWith = ParameterValidator.class) + @Parameter(names = { "-t", "--num-topic" }, description = "Number of managed ledgers", validateWith = PositiveNumberParameterValidator.class) public int numManagedLedgers = 1; - @Parameter(names = { "--threads" }, description = "Number of threads writing", validateWith = ParameterValidator.class) + @Parameter(names = { "--threads" }, description = "Number of threads writing", validateWith = PositiveNumberParameterValidator.class) public int numThreads = 1; @Parameter(names = { "-zk", "--zookeeperServers" }, description = "ZooKeeper connection string", required = true) diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java index 5e1dc3cc0529e..f22f637775ff9 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java @@ -21,7 +21,6 @@ import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; -import com.beust.jcommander.IParameterValidator; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.beust.jcommander.ParameterException; @@ -83,13 +82,13 @@ static class Arguments { @Parameter(description = "persistent://prop/ns/my-topic", required = true) public List topic; - @Parameter(names = { "-t", "--num-topics" }, description = "Number of topics", validateWith = ParameterValidator.class) + @Parameter(names = { "-t", "--num-topics" }, description = "Number of topics", validateWith = PositiveNumberParameterValidator.class) public int numTopics = 1; - @Parameter(names = { "-n", "--num-consumers" }, description = "Number of consumers (per subscription), only one consumer is allowed when subscriptionType is Exclusive", validateWith = ParameterValidator.class) + @Parameter(names = { "-n", "--num-consumers" }, description = "Number of consumers (per subscription), only one consumer is allowed when subscriptionType is Exclusive", validateWith = PositiveNumberParameterValidator.class) public int numConsumers = 1; - @Parameter(names = { "-ns", "--num-subscriptions" }, description = "Number of subscriptions (per topic)", validateWith = ParameterValidator.class) + @Parameter(names = { "-ns", "--num-subscriptions" }, description = "Number of subscriptions (per topic)", validateWith = PositiveNumberParameterValidator.class) public int numSubscriptions = 1; @Parameter(names = { "-s", "--subscriber-name" }, description = "Subscriber name prefix", hidden = true) diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java index c29747796becb..fe5b91c99bb9d 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java @@ -22,7 +22,6 @@ import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; -import com.beust.jcommander.IParameterValidator; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.beust.jcommander.ParameterException; @@ -109,7 +108,7 @@ static class Arguments { @Parameter(description = "persistent://prop/ns/my-topic", required = true) public List topics; - @Parameter(names = { "-threads", "--num-test-threads" }, description = "Number of test threads", validateWith = ParameterValidator.class) + @Parameter(names = { "-threads", "--num-test-threads" }, description = "Number of test threads", validateWith = PositiveNumberParameterValidator.class) public int numTestThreads = 1; @Parameter(names = { "-r", "--rate" }, description = "Publish rate msg/s across topics") @@ -118,10 +117,10 @@ static class Arguments { @Parameter(names = { "-s", "--size" }, description = "Message size (bytes)") public int msgSize = 1024; - @Parameter(names = { "-t", "--num-topic" }, description = "Number of topics", validateWith = ParameterValidator.class) + @Parameter(names = { "-t", "--num-topic" }, description = "Number of topics", validateWith = PositiveNumberParameterValidator.class) public int numTopics = 1; - @Parameter(names = { "-n", "--num-producers" }, description = "Number of producers (per topic)", validateWith = ParameterValidator.class) + @Parameter(names = { "-n", "--num-producers" }, description = "Number of producers (per topic)", validateWith = PositiveNumberParameterValidator.class) public int numProducers = 1; @Parameter(names = {"--separator"}, description = "Separator between the topic and topic number") diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java index d7a5dcd18da91..e4b24c1aa878e 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java @@ -72,7 +72,7 @@ static class Arguments { @Parameter(description = "persistent://prop/ns/my-topic", required = true) public List topic; - @Parameter(names = { "-t", "--num-topics" }, description = "Number of topics", validateWith = ParameterValidator.class) + @Parameter(names = { "-t", "--num-topics" }, description = "Number of topics", validateWith = PositiveNumberParameterValidator.class) public int numTopics = 1; @Parameter(names = { "-r", "--rate" }, description = "Simulate a slow message reader (rate in msg/s)") diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ParameterValidator.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PositiveNumberParameterValidator.java similarity index 93% rename from pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ParameterValidator.java rename to pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PositiveNumberParameterValidator.java index 8a9bea583deec..523d9c16f5ee3 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ParameterValidator.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PositiveNumberParameterValidator.java @@ -21,13 +21,12 @@ import com.beust.jcommander.IParameterValidator; import com.beust.jcommander.ParameterException; -public class ParameterValidator implements IParameterValidator { +public class PositiveNumberParameterValidator implements IParameterValidator { @Override public void validate(String name, String value) throws ParameterException { if (Integer.parseInt(value) <= 0) { throw new ParameterException("Parameter " + name + " should be > 0 (found " + value + ")"); } - return; } } diff --git a/site2/docs/reference-cli-tools.md b/site2/docs/reference-cli-tools.md index 5c7f67b23ad34..7422825f9738e 100644 --- a/site2/docs/reference-cli-tools.md +++ b/site2/docs/reference-cli-tools.md @@ -448,7 +448,7 @@ Options |`-ss`, `--subscriptions`|A list of subscriptions to consume on (e.g. sub1,sub2)|sub| |`-st`, `--subscription-type`|Subscriber type. Possible values are Exclusive, Shared, Failover, Key_Shared.|Exclusive| |`-sp`, `--subscription-position`|Subscriber position. Possible values are Latest, Earliest.|Latest| -|`-time`, `--test-duration`|Test duration in secs. If <= 0, it will keep consuming|0| +|`-time`, `--test-duration`|Test duration (in seconds). If this value is less than or equal to 0, it keeps consuming messages.|0| |`--trust-cert-file`|Path for the trusted TLS certificate file|| |`--tls-allow-insecure`|Allow insecure TLS connection|| @@ -478,7 +478,7 @@ Options |`-c`, `--max-connections`|Max number of TCP connections to a single broker|100| |`-o`, `--max-outstanding`|Max number of outstanding messages|1000| |`-p`, `--max-outstanding-across-partitions`|Max number of outstanding messages across partitions|50000| -|`-m`, `--num-messages`|Number of messages to publish in total. If <= 0, it will keep publishing.|0| +|`-m`, `--num-messages`|Number of messages to publish in total. If this value is less than or equal to 0, it keeps publishing messages.|0| |`-n`, `--num-producers`|The number of producers (per topic)|1| |`-t`, `--num-topic`|The number of topics|1| |`-f`, `--payload-file`|Use payload from an UTF-8 encoded text file and a payload will be randomly selected when publishing messages|| @@ -487,7 +487,7 @@ Options |`-u`, `--service-url`|Pulsar service URL|| |`-s`, `--size`|Message size (in bytes)|1024| |`-i`, `--stats-interval-seconds`|Statistics interval seconds. If 0, statistics will be disabled.|0| -|`-time`, `--test-duration`|Test duration in secs. If <= 0, it will keep publishing.|0| +|`-time`, `--test-duration`|Test duration (in seconds). If this value is less than or equal to 0, it keeps publishing messages.|0| |`--trust-cert-file`|Path for the trusted TLS certificate file|| |`--warmup-time`|Warm-up time in seconds|1| |`--tls-allow-insecure`|Allow insecure TLS connection|| @@ -516,7 +516,7 @@ Options |`-u`, `--service-url`|Pulsar service URL|| |`-m`, `--start-message-id`|Start message id. This can be either 'earliest', 'latest' or a specific message id by using 'lid:eid'|earliest| |`-i`, `--stats-interval-seconds`|Statistics interval seconds. If 0, statistics will be disabled.|0| -|`-time`, `--test-duration`|Test duration in secs. If <= 0, it will keep consuming|0| +|`-time`, `--test-duration`|Test duration (in seconds). If this value is less than or equal to 0, it keeps consuming messages.|0| |`--trust-cert-file`|Path for the trusted TLS certificate file|| |`--use-tls`|Use TLS encryption on the connection|false| |`--tls-allow-insecure`|Allow insecure TLS connection|| @@ -536,13 +536,13 @@ Options |`--auth_plugin`|Authentication plugin class name|| |`--conf-file`|Configuration file|| |`-h`, `--help`|Help message|false| -|`-m`, `--num-messages`|Number of messages to publish in total. If <= 0, it will keep publishing|0| +|`-m`, `--num-messages`|Number of messages to publish in total. If this value is less than or equal to 0, it keeps publishing messages.|0| |`-t`, `--num-topic`|The number of topics|1| |`-f`, `--payload-file`|Use payload from a file instead of empty buffer|| |`-u`, `--proxy-url`|Pulsar Proxy URL, e.g., "ws://localhost:8080/"|| |`-r`, `--rate`|Publish rate msg/s across topics|100| |`-s`, `--size`|Message size in byte|1024| -|`-time`, `--test-duration`|Test duration in secs. If <= 0, it will keep publishing|0| +|`-time`, `--test-duration`|Test duration (in seconds). If this value is less than or equal to 0, it keeps publishing messages.|0| ### `managed-ledger` @@ -562,11 +562,11 @@ Options |`-h`, `--help`|Help message|false| |`-c`, `--max-connections`|Max number of TCP connections to a single bookie|1| |`-o`, `--max-outstanding`|Max number of outstanding requests|1000| -|`-m`, `--num-messages`|Number of messages to publish in total. If <= 0, it will keep publishing|0| +|`-m`, `--num-messages`|Number of messages to publish in total. If this value is less than or equal to 0, it keeps publishing messages.|0| |`-t`, `--num-topic`|Number of managed ledgers|1| |`-r`, `--rate`|Write rate msg/s across managed ledgers|100| |`-s`, `--size`|Message size in byte|1024| -|`-time`, `--test-duration`|Test duration in secs. If <= 0, it will keep publishing|0| +|`-time`, `--test-duration`|Test duration (in seconds). If this value is less than or equal to 0, it keeps publishing messages.|0| |`--threads`|Number of threads writing|1| |`-w`, `--write-quorum`|Ledger write quorum|1| |`-zk`, `--zookeeperServers`|ZooKeeper connection string||