diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
index 8b32ad906eac4..f84f344ba5d69 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
@@ -73,6 +73,7 @@ public void testInitialization() throws InterruptedException, ExecutionException
Properties properties = new Properties();
properties.setProperty("serviceUrl", brokerUrl.toString());
properties.setProperty("useTls", "false");
+ properties.setProperty("memoryLimit", "10M");
String tenantName = UUID.randomUUID().toString();
@@ -94,6 +95,7 @@ public void testInitialization() throws InterruptedException, ExecutionException
String[] args = { "consume", "-t", "Exclusive", "-s", "sub-name", "-n",
Integer.toString(numberOfMessages), "--hex", "-r", "30", topicName };
Assert.assertEquals(pulsarClientToolConsumer.run(args), 0);
+ Assert.assertEquals(pulsarClientToolConsumer.rootParams.memoryLimit, 10 * 1024 * 1024);
future.complete(null);
} catch (Throwable t) {
future.completeExceptionally(t);
@@ -108,6 +110,7 @@ public void testInitialization() throws InterruptedException, ExecutionException
String[] args = { "produce", "--messages", "Have a nice day", "-n", Integer.toString(numberOfMessages), "-r",
"20", "-p", "key1=value1", "-p", "key2=value2", "-k", "partition_key", topicName };
Assert.assertEquals(pulsarClientToolProducer.run(args), 0);
+ Assert.assertEquals(pulsarClientToolProducer.rootParams.memoryLimit, 10 * 1024 * 1024);
future.get();
}
@@ -342,22 +345,49 @@ public void testArgs() throws Exception {
final String message = "test msg";
final int numberOfMessages = 1;
final String topicName = getTopicWithRandomSuffix("test-topic");
+ final String memoryLimitArg = "10M";
String[] args = {"--url", url,
"--auth-plugin", authPlugin,
"--auth-params", authParams,
"--tlsTrustCertsFilePath", CA_CERT_FILE_PATH,
+ "--memory-limit", memoryLimitArg,
"produce", "-m", message,
"-n", Integer.toString(numberOfMessages), topicName};
pulsarClientTool.jcommander.parse(args);
assertEquals(pulsarClientTool.rootParams.getTlsTrustCertsFilePath(), CA_CERT_FILE_PATH);
assertEquals(pulsarClientTool.rootParams.getAuthParams(), authParams);
assertEquals(pulsarClientTool.rootParams.getAuthPluginClassName(), authPlugin);
+ assertEquals(pulsarClientTool.rootParams.getMemoryLimit(), 10 * 1024 * 1024);
assertEquals(pulsarClientTool.rootParams.getServiceURL(), url);
assertNull(pulsarClientTool.rootParams.getProxyServiceURL());
assertNull(pulsarClientTool.rootParams.getProxyProtocol());
}
+ @Test(timeOut = 20000)
+ public void testMemoryLimitArgShortName() throws Exception {
+ PulsarClientTool pulsarClientTool = new PulsarClientTool(new Properties());
+ final String url = "pulsar+ssl://localhost:6651";
+ final String authPlugin = "org.apache.pulsar.client.impl.auth.AuthenticationTls";
+ final String authParams = String.format("tlsCertFile:%s,tlsKeyFile:%s", getTlsFileForClient("admin.cert"),
+ getTlsFileForClient("admin.key-pk8"));
+ final String message = "test msg";
+ final int numberOfMessages = 1;
+ final String topicName = getTopicWithRandomSuffix("test-topic");
+ final String memoryLimitArg = "10M";
+
+ String[] args = {"--url", url,
+ "--auth-plugin", authPlugin,
+ "--auth-params", authParams,
+ "--tlsTrustCertsFilePath", CA_CERT_FILE_PATH,
+ "-ml", memoryLimitArg,
+ "produce", "-m", message,
+ "-n", Integer.toString(numberOfMessages), topicName};
+
+ pulsarClientTool.jcommander.parse(args);
+ assertEquals(pulsarClientTool.rootParams.getMemoryLimit(), 10 * 1024 * 1024);
+ }
+
@Test
public void testParsingProxyServiceUrlAndProxyProtocolFromProperties() throws Exception {
Properties properties = new Properties();
diff --git a/pulsar-client-tools/pom.xml b/pulsar-client-tools/pom.xml
index 136d9a596ae2e..4a35523131ad2 100644
--- a/pulsar-client-tools/pom.xml
+++ b/pulsar-client-tools/pom.xml
@@ -67,6 +67,11 @@
pulsar-client-messagecrypto-bc
${project.version}
+
+ ${project.groupId}
+ pulsar-cli-utils
+ ${project.version}
+
org.asynchttpclient
async-http-client
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
index c64d80f380b9f..4057bbe9fdfd8 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
@@ -32,6 +32,7 @@
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.PulsarVersion;
+import org.apache.pulsar.cli.converters.ByteUnitToLongConverter;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
@@ -40,7 +41,6 @@
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.apache.pulsar.client.api.SizeUnit;
-
public class PulsarClientTool {
@Getter
@@ -76,6 +76,10 @@ public static class RootParams {
@Parameter(names = { "--tlsTrustCertsFilePath" }, description = "File path to client trust certificates")
String tlsTrustCertsFilePath;
+
+ @Parameter(names = { "-ml", "--memory-limit", }, description = "Configure the Pulsar client memory limit "
+ + "(eg: 32M, 64M)", converter = ByteUnitToLongConverter.class)
+ long memoryLimit = 0L;
}
protected RootParams rootParams;
@@ -151,6 +155,11 @@ protected void initRootParamsFromProperties(Properties properties) {
this.rootParams.authParams = properties.getProperty("authParams");
this.rootParams.tlsTrustCertsFilePath = properties.getProperty("tlsTrustCertsFilePath");
this.rootParams.proxyServiceURL = StringUtils.trimToNull(properties.getProperty("proxyServiceUrl"));
+ // setting memory limit
+ this.rootParams.memoryLimit = StringUtils.isNotEmpty(properties.getProperty("memoryLimit"))
+ ? new ByteUnitToLongConverter("memoryLimit").convert(properties.getProperty("memoryLimit"))
+ : this.rootParams.memoryLimit;
+
String proxyProtocolString = StringUtils.trimToNull(properties.getProperty("proxyProtocol"));
if (proxyProtocolString != null) {
try {
@@ -165,7 +174,7 @@ protected void initRootParamsFromProperties(Properties properties) {
private void updateConfig() throws UnsupportedAuthenticationException {
ClientBuilder clientBuilder = PulsarClient.builder()
- .memoryLimit(0, SizeUnit.BYTES);
+ .memoryLimit(rootParams.memoryLimit, SizeUnit.BYTES);
Authentication authentication = null;
if (isNotBlank(this.rootParams.authPluginClassName)) {
authentication = AuthenticationFactory.create(rootParams.authPluginClassName, rootParams.authParams);
diff --git a/pulsar-testclient/pom.xml b/pulsar-testclient/pom.xml
index 598fe477be1be..a13b2db33104b 100644
--- a/pulsar-testclient/pom.xml
+++ b/pulsar-testclient/pom.xml
@@ -85,6 +85,12 @@
${project.version}
+
+ ${project.groupId}
+ pulsar-cli-utils
+ ${project.version}
+
+
commons-configuration
commons-configuration
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java
index 64330ae2eeea1..982c71ce6a5f4 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java
@@ -37,6 +37,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
+import org.apache.pulsar.cli.converters.ByteUnitToLongConverter;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
@@ -180,6 +181,10 @@ private static class MainArguments {
@Parameter(names = { "--service-url" }, description = "Pulsar Service URL", required = true)
public String serviceURL;
+
+ @Parameter(names = { "-ml", "--memory-limit", }, description = "Configure the Pulsar client memory limit "
+ + "(eg: 32M, 64M)", converter = ByteUnitToLongConverter.class)
+ public long memoryLimit = 0L;
}
// Configuration class for initializing or modifying TradeUnits.
@@ -318,7 +323,7 @@ public LoadSimulationClient(final MainArguments arguments) throws Exception {
.serviceHttpUrl(arguments.serviceURL)
.build();
client = PulsarClient.builder()
- .memoryLimit(0, SizeUnit.BYTES)
+ .memoryLimit(arguments.memoryLimit, SizeUnit.BYTES)
.serviceUrl(arguments.serviceURL)
.connectionsPerBroker(4)
.ioThreads(Runtime.getRuntime().availableProcessors())
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
index b312ceb6e3eff..3b44023ef503e 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
@@ -30,6 +30,7 @@
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.common.util.DirectMemoryUtils;
import org.slf4j.Logger;
@@ -66,6 +67,7 @@ public static ClientBuilder createClientBuilderFromArguments(PerformanceBaseArgu
throws PulsarClientException.UnsupportedAuthenticationException {
ClientBuilder clientBuilder = PulsarClient.builder()
+ .memoryLimit(arguments.memoryLimit, SizeUnit.BYTES)
.serviceUrl(arguments.serviceURL)
.connectionsPerBroker(arguments.maxConnections)
.ioThreads(arguments.ioThreads)
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java
index 5ae79fb0bf9a4..bc4ab003c4670 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java
@@ -28,6 +28,7 @@
import java.util.Properties;
import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.cli.converters.ByteUnitToLongConverter;
import org.apache.pulsar.client.api.ProxyProtocol;
/**
@@ -103,6 +104,10 @@ public abstract class PerformanceBaseArguments {
@Parameter(names = { "--auth_plugin" }, description = "Authentication plugin class name", hidden = true)
public String deprecatedAuthPluginClassName;
+ @Parameter(names = { "-ml", "--memory-limit", }, description = "Configure the Pulsar client memory limit "
+ + "(eg: 32M, 64M)", converter = ByteUnitToLongConverter.class)
+ public long memoryLimit;
+
public abstract void fillArgumentsFromProperties(Properties prop);
@SneakyThrows
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 59dabc9302622..9bd74be3aa859 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -47,6 +47,7 @@
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
@@ -229,6 +230,7 @@ public static void main(String[] args) throws Exception {
long testEndTime = startTime + (long) (arguments.testTime * 1e9);
ClientBuilder clientBuilder = PerfClientUtils.createClientBuilderFromArguments(arguments)
+ .memoryLimit(arguments.memoryLimit, SizeUnit.BYTES)
.enableTransaction(arguments.isEnableTransaction);
PulsarClient pulsarClient = clientBuilder.build();
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 63e3e2ec6fd23..e57d6ca225123 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -69,6 +69,7 @@
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -512,6 +513,7 @@ private static void runProducer(int producerId,
ClientBuilder clientBuilder = PerfClientUtils.createClientBuilderFromArguments(arguments)
+ .memoryLimit(arguments.memoryLimit, SizeUnit.BYTES)
.enableTransaction(arguments.isEnableTransaction);
client = clientBuilder.build();
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
index ed5cc37644a31..6174caad1f938 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
@@ -40,6 +40,7 @@
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.ReaderListener;
+import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
@@ -140,6 +141,7 @@ public static void main(String[] args) throws Exception {
};
ClientBuilder clientBuilder = PerfClientUtils.createClientBuilderFromArguments(arguments)
+ .memoryLimit(arguments.memoryLimit, SizeUnit.BYTES)
.enableTls(arguments.useTls);
PulsarClient pulsarClient = clientBuilder.build();
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
index 469e6ab1f3fd6..3b422452d6401 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
@@ -58,6 +58,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
@@ -223,6 +224,7 @@ public static void main(String[] args)
}
ClientBuilder clientBuilder = PerfClientUtils.createClientBuilderFromArguments(arguments)
+ .memoryLimit(arguments.memoryLimit, SizeUnit.BYTES)
.enableTransaction(!arguments.isDisableTransaction);
PulsarClient client = clientBuilder.build();
diff --git a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceBaseArgumentsTest.java b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceBaseArgumentsTest.java
index 42c93be343074..699f138bfdaa8 100644
--- a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceBaseArgumentsTest.java
+++ b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceBaseArgumentsTest.java
@@ -18,20 +18,20 @@
*/
package org.apache.pulsar.testclient;
+import static org.apache.pulsar.client.api.ProxyProtocol.SNI;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
-
import org.testng.Assert;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-import static org.apache.pulsar.client.api.ProxyProtocol.SNI;
-import static org.testng.Assert.fail;
-
-
public class PerformanceBaseArgumentsTest {
@Test
@@ -158,4 +158,74 @@ public void fillArgumentsFromProperties(Properties prop) {
tempConfigFile.delete();
}
}
+
+ @DataProvider(name = "memoryLimitCliArgumentProvider")
+ public Object[][] memoryLimitCliArgumentProvider() {
+ return new Object[][] {
+ { new String[]{"-ml","1"}, 1L},
+ { new String[]{"-ml","1K"}, 1024L},
+ { new String[]{"--memory-limit", "1G"}, 1024 * 1024 * 1024}
+ };
+ }
+
+ @Test(dataProvider = "memoryLimitCliArgumentProvider")
+ public void testMemoryLimitCliArgument(String[] cliArgs, long expectedMemoryLimit) {
+ for (String cmd : List.of(
+ "pulsar-perf read",
+ "pulsar-perf produce",
+ "pulsar-perf consume",
+ "pulsar-perf transaction"
+ )) {
+ // Arrange
+ AtomicBoolean called = new AtomicBoolean();
+ final PerformanceBaseArguments baseArgument = new PerformanceBaseArguments() {
+ @Override
+ public void fillArgumentsFromProperties(Properties prop) {
+ called.set(true);
+ }
+ };
+ baseArgument.confFile = "./src/test/resources/perf_client1.conf";
+
+ // Act
+ baseArgument.parseCLI(cmd, cliArgs);
+
+ // Assert
+ assertEquals(baseArgument.memoryLimit, expectedMemoryLimit);
+ }
+ }
+
+ @DataProvider(name = "invalidMemoryLimitCliArgumentProvider")
+ public Object[][] invalidMemoryLimitCliArgumentProvider() {
+ return new Object[][] {
+ { new String[]{"-ml","-1"}},
+ { new String[]{"-ml","1C"}},
+ { new String[]{"--memory-limit", "1Q"}}
+ };
+ }
+
+ @Test
+ public void testMemoryLimitCliArgumentDefault() {
+ for (String cmd : List.of(
+ "pulsar-perf read",
+ "pulsar-perf produce",
+ "pulsar-perf consume",
+ "pulsar-perf transaction"
+ )) {
+ // Arrange
+ AtomicBoolean called = new AtomicBoolean();
+ final PerformanceBaseArguments baseArgument = new PerformanceBaseArguments() {
+ @Override
+ public void fillArgumentsFromProperties(Properties prop) {
+ called.set(true);
+ }
+ };
+ baseArgument.confFile = "./src/test/resources/perf_client1.conf";
+
+ // Act
+ baseArgument.parseCLI(cmd, new String[]{});
+
+ // Assert
+ assertEquals(baseArgument.memoryLimit, 0L);
+ }
+ }
}
diff --git a/src/check-binary-license.sh b/src/check-binary-license.sh
index 3a6d266345f30..4b48da2061c3a 100755
--- a/src/check-binary-license.sh
+++ b/src/check-binary-license.sh
@@ -41,7 +41,7 @@ if [ -z $TARBALL ]; then
exit 1
fi
-JARS=$(tar -tf $TARBALL | grep '\.jar' | grep -v 'trino/' | grep -v '/examples/' | grep -v '/instances/' | grep -v pulsar-client | grep -v pulsar-common | grep -v pulsar-package | grep -v pulsar-websocket | grep -v bouncy-castle-bc | sed 's!.*/!!' | sort)
+JARS=$(tar -tf $TARBALL | grep '\.jar' | grep -v 'trino/' | grep -v '/examples/' | grep -v '/instances/' | grep -v pulsar-client | grep -v pulsar-cli-utils | grep -v pulsar-common | grep -v pulsar-package | grep -v pulsar-websocket | grep -v bouncy-castle-bc | sed 's!.*/!!' | sort)
LICENSEPATH=$(tar -tf $TARBALL | awk '/^[^\/]*\/LICENSE/')
LICENSE=$(tar -O -xf $TARBALL "$LICENSEPATH")