Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ ClientBuilder authentication(String authPluginClassName, Map<String, String> aut
* Increasing this parameter may improve throughput when using many producers over a high latency connection.
*
* @param connectionsPerBroker
* max number of connections per broker (needs to be greater than 0)
* max number of connections per broker (needs to be greater than or equal to 0)
* @return the client builder instance
*/
ClientBuilder connectionsPerBroker(int connectionsPerBroker);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;

import static com.google.common.base.Preconditions.checkArgument;

public class ClientBuilderImpl implements ClientBuilder {
ClientConfigurationData conf;

Expand Down Expand Up @@ -140,6 +142,7 @@ public ClientBuilder authentication(String authPluginClassName, Map<String, Stri

@Override
public ClientBuilder operationTimeout(int operationTimeout, TimeUnit unit) {
checkArgument(operationTimeout >= 0, "operationTimeout needs to be >= 0");
conf.setOperationTimeoutMs(unit.toMillis(operationTimeout));
return this;
}
Expand All @@ -152,18 +155,21 @@ public ClientBuilder lookupTimeout(int lookupTimeout, TimeUnit unit) {

@Override
public ClientBuilder ioThreads(int numIoThreads) {
checkArgument(numIoThreads > 0, "ioThreads needs to be > 0");
conf.setNumIoThreads(numIoThreads);
return this;
}

@Override
public ClientBuilder listenerThreads(int numListenerThreads) {
checkArgument(numListenerThreads > 0, "listenerThreads needs to be > 0");
conf.setNumListenerThreads(numListenerThreads);
return this;
}

@Override
public ClientBuilder connectionsPerBroker(int connectionsPerBroker) {
checkArgument(connectionsPerBroker >= 0, "connectionsPerBroker needs to be >= 0");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per the java doc on the ClientBuilder interface, the value must be greater than 0.

Suggested change
checkArgument(connectionsPerBroker >= 0, "connectionsPerBroker needs to be >= 0");
checkArgument(connectionsPerBroker > 0, "connectionsPerBroker needs to be > 0");

Copy link
Contributor Author

@yuruguo yuruguo Sep 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. PTAL again, THX!

I have reverted,see review

conf.setConnectionsPerBroker(connectionsPerBroker);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ public ReaderBuilder<T> cryptoFailureAction(ConsumerCryptoFailureAction action)

@Override
public ReaderBuilder<T> receiverQueueSize(int receiverQueueSize) {
checkArgument(receiverQueueSize >= 0, "receiverQueueSize needs to be >= 0");
conf.setReceiverQueueSize(receiverQueueSize);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public class ClientConfigurationData implements Serializable, Cloneable {
@ApiModelProperty(
name = "connectionsPerBroker",
value = "Number of connections established between the client and each Broker."
+ "A value of 0 means to disable connection pooling."
)
private int connectionsPerBroker = 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.testclient.PositiveNumberParameterValidator;
import org.apache.pulsar.testclient.PerfClientUtils;
import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
import org.slf4j.Logger;
Expand Down Expand Up @@ -90,7 +91,7 @@ static class Arguments {
@Parameter(names = { "-s", "--size" }, description = "Message size in byte")
public int msgSize = 1024;

@Parameter(names = { "-t", "--num-topic" }, description = "Number of topics")
@Parameter(names = { "-t", "--num-topic" }, description = "Number of topics", validateWith = PositiveNumberParameterValidator.class)
public int numTopics = 1;

@Parameter(names = { "--auth_plugin" }, description = "Authentication plugin class name")
Expand All @@ -104,14 +105,14 @@ static class Arguments {
public String authParams;

@Parameter(names = { "-m",
"--num-messages" }, description = "Number of messages to publish in total. If 0, it will keep publishing")
"--num-messages" }, description = "Number of messages to publish in total. If <= 0, it will keep publishing")
public long numMessages = 0;

@Parameter(names = { "-f", "--payload-file" }, description = "Use payload from a file instead of empty buffer")
public String payloadFilename = null;

@Parameter(names = { "-time",
"--test-duration" }, description = "Test duration in secs. If 0, it will keep publishing")
"--test-duration" }, description = "Test duration in secs. If <= 0, it will keep publishing")
public long testTime = 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,10 @@ static class Arguments {
@Parameter(names = { "-s", "--size" }, description = "Message size")
public int msgSize = 1024;

@Parameter(names = { "-t", "--num-topic" }, description = "Number of managed ledgers")
@Parameter(names = { "-t", "--num-topic" }, description = "Number of managed ledgers", validateWith = PositiveNumberParameterValidator.class)
public int numManagedLedgers = 1;

@Parameter(names = { "--threads" }, description = "Number of threads writing")
@Parameter(names = { "--threads" }, description = "Number of threads writing", validateWith = PositiveNumberParameterValidator.class)
public int numThreads = 1;

@Parameter(names = { "-zk", "--zookeeperServers" }, description = "ZooKeeper connection string", required = true)
Expand All @@ -110,7 +110,7 @@ static class Arguments {
public int maxConnections = 1;

@Parameter(names = { "-m",
"--num-messages" }, description = "Number of messages to publish in total. If 0, it will keep publishing")
"--num-messages" }, description = "Number of messages to publish in total. If <= 0, it will keep publishing")
public long numMessages = 0;

@Parameter(names = { "-e", "--ensemble-size" }, description = "Ledger ensemble size")
Expand All @@ -126,7 +126,7 @@ static class Arguments {
public DigestType digestType = DigestType.CRC32C;

@Parameter(names = { "-time",
"--test-duration" }, description = "Test duration in secs. If 0, it will keep publishing")
"--test-duration" }, description = "Test duration in secs. If <= 0, it will keep publishing")
public long testTime = 0;

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ static class Arguments {
@Parameter(description = "persistent://prop/ns/my-topic", required = true)
public List<String> topic;

@Parameter(names = { "-t", "--num-topics" }, description = "Number of topics")
@Parameter(names = { "-t", "--num-topics" }, description = "Number of topics", validateWith = PositiveNumberParameterValidator.class)
public int numTopics = 1;

@Parameter(names = { "-n", "--num-consumers" }, description = "Number of consumers (per subscription), only one consumer is allowed when subscriptionType is Exclusive")
@Parameter(names = { "-n", "--num-consumers" }, description = "Number of consumers (per subscription), only one consumer is allowed when subscriptionType is Exclusive", validateWith = PositiveNumberParameterValidator.class)
public int numConsumers = 1;

@Parameter(names = { "-ns", "--num-subscriptions" }, description = "Number of subscriptions (per topic)")
@Parameter(names = { "-ns", "--num-subscriptions" }, description = "Number of subscriptions (per topic)", validateWith = PositiveNumberParameterValidator.class)
public int numSubscriptions = 1;

@Parameter(names = { "-s", "--subscriber-name" }, description = "Subscriber name prefix", hidden = true)
Expand Down Expand Up @@ -166,7 +166,7 @@ static class Arguments {
public String encKeyFile = null;

@Parameter(names = { "-time",
"--test-duration" }, description = "Test duration in secs. If 0, it will keep consuming")
"--test-duration" }, description = "Test duration in secs. If <= 0, it will keep consuming")
public long testTime = 0;

@Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ static class Arguments {
@Parameter(description = "persistent://prop/ns/my-topic", required = true)
public List<String> topics;

@Parameter(names = { "-threads", "--num-test-threads" }, description = "Number of test threads")
@Parameter(names = { "-threads", "--num-test-threads" }, description = "Number of test threads", validateWith = PositiveNumberParameterValidator.class)
public int numTestThreads = 1;

@Parameter(names = { "-r", "--rate" }, description = "Publish rate msg/s across topics")
Expand All @@ -117,10 +117,10 @@ static class Arguments {
@Parameter(names = { "-s", "--size" }, description = "Message size (bytes)")
public int msgSize = 1024;

@Parameter(names = { "-t", "--num-topic" }, description = "Number of topics")
@Parameter(names = { "-t", "--num-topic" }, description = "Number of topics", validateWith = PositiveNumberParameterValidator.class)
public int numTopics = 1;

@Parameter(names = { "-n", "--num-producers" }, description = "Number of producers (per topic)")
@Parameter(names = { "-n", "--num-producers" }, description = "Number of producers (per topic)", validateWith = PositiveNumberParameterValidator.class)
public int numProducers = 1;

@Parameter(names = {"--separator"}, description = "Separator between the topic and topic number")
Expand Down Expand Up @@ -169,7 +169,7 @@ static class Arguments {
public int maxConnections = 100;

@Parameter(names = { "-m",
"--num-messages" }, description = "Number of messages to publish in total. If 0, it will keep publishing")
"--num-messages" }, description = "Number of messages to publish in total. If <= 0, it will keep publishing")
public long numMessages = 0;

@Parameter(names = { "-i",
Expand Down Expand Up @@ -201,7 +201,7 @@ static class Arguments {
public int batchMaxBytes = 4 * 1024 * 1024;

@Parameter(names = { "-time",
"--test-duration" }, description = "Test duration in secs. If 0, it will keep publishing")
"--test-duration" }, description = "Test duration in secs. If <= 0, it will keep publishing")
public long testTime = 0;

@Parameter(names = "--warmup-time", description = "Warm-up time in seconds (Default: 1 sec)")
Expand Down Expand Up @@ -538,7 +538,7 @@ private static void runProducer(int producerId,
producerBuilder.producerName(producerName);
}

if (arguments.batchTimeMillis == 0.0 && arguments.batchMaxMessages == 0) {
if (arguments.batchTimeMillis <= 0.0 && arguments.batchMaxMessages <= 0) {
producerBuilder.enableBatching(false);
} else {
long batchTimeUsec = (long) (arguments.batchTimeMillis * 1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ static class Arguments {
@Parameter(description = "persistent://prop/ns/my-topic", required = true)
public List<String> topic;

@Parameter(names = { "-t", "--num-topics" }, description = "Number of topics")
@Parameter(names = { "-t", "--num-topics" }, description = "Number of topics", validateWith = PositiveNumberParameterValidator.class)
public int numTopics = 1;

@Parameter(names = { "-r", "--rate" }, description = "Simulate a slow message reader (rate in msg/s)")
Expand Down Expand Up @@ -122,7 +122,7 @@ static class Arguments {
public Boolean tlsAllowInsecureConnection = null;

@Parameter(names = { "-time",
"--test-duration" }, description = "Test duration in secs. If 0, it will keep consuming")
"--test-duration" }, description = "Test duration in secs. If <= 0, it will keep consuming")
public long testTime = 0;

@Parameter(names = {"-ioThreads", "--num-io-threads"}, description = "Set the number of threads to be " +
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.testclient;

import com.beust.jcommander.IParameterValidator;
import com.beust.jcommander.ParameterException;

public class PositiveNumberParameterValidator implements IParameterValidator {

@Override
public void validate(String name, String value) throws ParameterException {
if (Integer.parseInt(value) <= 0) {
throw new ParameterException("Parameter " + name + " should be > 0 (found " + value + ")");
}
}
}
14 changes: 8 additions & 6 deletions site2/docs/reference-cli-tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ Options
|`-ss`, `--subscriptions`|A list of subscriptions to consume on (e.g. sub1,sub2)|sub|
|`-st`, `--subscription-type`|Subscriber type. Possible values are Exclusive, Shared, Failover, Key_Shared.|Exclusive|
|`-sp`, `--subscription-position`|Subscriber position. Possible values are Latest, Earliest.|Latest|
|`-time`, `--test-duration`|Test duration (in seconds). If this value is less than or equal to 0, it keeps consuming messages.|0|
|`--trust-cert-file`|Path for the trusted TLS certificate file||
|`--tls-allow-insecure`|Allow insecure TLS connection||

Expand Down Expand Up @@ -477,7 +478,7 @@ Options
|`-c`, `--max-connections`|Max number of TCP connections to a single broker|100|
|`-o`, `--max-outstanding`|Max number of outstanding messages|1000|
|`-p`, `--max-outstanding-across-partitions`|Max number of outstanding messages across partitions|50000|
|`-m`, `--num-messages`|Number of messages to publish in total. If set to 0, it will keep publishing.|0|
|`-m`, `--num-messages`|Number of messages to publish in total. If this value is less than or equal to 0, it keeps publishing messages.|0|
|`-n`, `--num-producers`|The number of producers (per topic)|1|
|`-t`, `--num-topic`|The number of topics|1|
|`-f`, `--payload-file`|Use payload from an UTF-8 encoded text file and a payload will be randomly selected when publishing messages||
Expand All @@ -486,7 +487,7 @@ Options
|`-u`, `--service-url`|Pulsar service URL||
|`-s`, `--size`|Message size (in bytes)|1024|
|`-i`, `--stats-interval-seconds`|Statistics interval seconds. If 0, statistics will be disabled.|0|
|`-time`, `--test-duration`|Test duration in secs. If set to 0, it will keep publishing.|0|
|`-time`, `--test-duration`|Test duration (in seconds). If this value is less than or equal to 0, it keeps publishing messages.|0|
|`--trust-cert-file`|Path for the trusted TLS certificate file||
|`--warmup-time`|Warm-up time in seconds|1|
|`--tls-allow-insecure`|Allow insecure TLS connection||
Expand Down Expand Up @@ -515,6 +516,7 @@ Options
|`-u`, `--service-url`|Pulsar service URL||
|`-m`, `--start-message-id`|Start message id. This can be either 'earliest', 'latest' or a specific message id by using 'lid:eid'|earliest|
|`-i`, `--stats-interval-seconds`|Statistics interval seconds. If 0, statistics will be disabled.|0|
|`-time`, `--test-duration`|Test duration (in seconds). If this value is less than or equal to 0, it keeps consuming messages.|0|
|`--trust-cert-file`|Path for the trusted TLS certificate file||
|`--use-tls`|Use TLS encryption on the connection|false|
|`--tls-allow-insecure`|Allow insecure TLS connection||
Expand All @@ -534,13 +536,13 @@ Options
|`--auth_plugin`|Authentication plugin class name||
|`--conf-file`|Configuration file||
|`-h`, `--help`|Help message|false|
|`-m`, `--num-messages`|Number of messages to publish in total. If 0, it will keep publishing|0|
|`-m`, `--num-messages`|Number of messages to publish in total. If this value is less than or equal to 0, it keeps publishing messages.|0|
|`-t`, `--num-topic`|The number of topics|1|
|`-f`, `--payload-file`|Use payload from a file instead of empty buffer||
|`-u`, `--proxy-url`|Pulsar Proxy URL, e.g., "ws://localhost:8080/"||
|`-r`, `--rate`|Publish rate msg/s across topics|100|
|`-s`, `--size`|Message size in byte|1024|
|`-time`, `--test-duration`|Test duration in secs. If 0, it will keep publishing|0|
|`-time`, `--test-duration`|Test duration (in seconds). If this value is less than or equal to 0, it keeps publishing messages.|0|


### `managed-ledger`
Expand All @@ -560,11 +562,11 @@ Options
|`-h`, `--help`|Help message|false|
|`-c`, `--max-connections`|Max number of TCP connections to a single bookie|1|
|`-o`, `--max-outstanding`|Max number of outstanding requests|1000|
|`-m`, `--num-messages`|Number of messages to publish in total. If 0, it will keep publishing|0|
|`-m`, `--num-messages`|Number of messages to publish in total. If this value is less than or equal to 0, it keeps publishing messages.|0|
|`-t`, `--num-topic`|Number of managed ledgers|1|
|`-r`, `--rate`|Write rate msg/s across managed ledgers|100|
|`-s`, `--size`|Message size in byte|1024|
|`-time`, `--test-duration`|Test duration in secs. If 0, it will keep publishing|0|
|`-time`, `--test-duration`|Test duration (in seconds). If this value is less than or equal to 0, it keeps publishing messages.|0|
|`--threads`|Number of threads writing|1|
|`-w`, `--write-quorum`|Ledger write quorum|1|
|`-zk`, `--zookeeperServers`|ZooKeeper connection string||
Expand Down