Skip to content

Commit

Permalink
ARTEMIS-4648 support typed properties from CLI producer
Browse files Browse the repository at this point in the history
  • Loading branch information
jbertram committed Mar 8, 2024
1 parent 063968b commit 6899a61
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public class Producer extends DestAbstract {
@Option(names = "--data", description = "Messages will be read from the specified file. Other message options will be ignored.")
String file = null;

@Option(names = "--properties", description = "The properties to set on the message in JSON, e.g.: [{\"type\":\"string\",\"key\":\"myKey1\",\"value\":\"myValue1\"},{\"type\":\"string\",\"key\":\"myKey2\",\"value\":\"myValue2\"}]. Valid types are boolean, byte, short, int, long, float, double, and string.")
String properties = null;

public boolean isNonpersistent() {
return nonpersistent;
}
Expand Down Expand Up @@ -88,6 +91,15 @@ public Producer setMessage(String message) {
return this;
}

public String getProperties() {
return properties;
}

public Producer setProperties(String properties) {
this.properties = properties;
return this;
}

public int getTextMessageSize() {
return textMessageSize;
}
Expand Down Expand Up @@ -205,6 +217,7 @@ public Object execute(ActionContext context) throws Exception {
.setMessageSize(messageSize)
.setTextMessageSize(textMessageSize)
.setMessage(message)
.setProperties(properties)
.setObjectSize(objectSize)
.setMsgTTL(msgTTL)
.setMsgGroupID(msgGroupID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.StringReader;
import java.net.URL;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.json.JsonArray;
import org.apache.activemq.artemis.json.JsonObject;
import org.apache.activemq.artemis.utils.JsonLoader;
import org.apache.activemq.artemis.utils.ReusableLatch;

public class ProducerThread extends Thread {
Expand All @@ -54,6 +58,7 @@ public class ProducerThread extends Thread {
int transactions = 0;
final AtomicLong sentCount = new AtomicLong(0);
String message = null;
String properties = null;
String messageText = null;
String payloadUrl = null;
byte[] payload = null;
Expand Down Expand Up @@ -193,9 +198,52 @@ protected Message createMessage(long i, String threadName) throws Exception {

answer.setLongProperty("count", i);
answer.setStringProperty("ThreadSent", threadName);

if (properties != null && properties.length() != 0) {
applyProperties(answer);
}

return answer;
}

protected void applyProperties(Message message) throws JMSException {
JsonArray propertyArray = JsonLoader.readArray(new StringReader(properties));
for (int j = 0; j < propertyArray.size(); j++) {
JsonObject propertyEntry = propertyArray.getJsonObject(j);
String type = propertyEntry.getString("type");
String key = propertyEntry.getString("key");
String value = propertyEntry.getString("value");
switch (type.toLowerCase()) {
case "boolean":
message.setBooleanProperty(key, Boolean.parseBoolean(value));
break;
case "int":
message.setIntProperty(key, Integer.parseInt(value));
break;
case "long":
message.setLongProperty(key, Long.parseLong(value));
break;
case "byte":
message.setByteProperty(key, Byte.parseByte(value));
break;
case "short":
message.setShortProperty(key, Short.parseShort(value));
break;
case "float":
message.setFloatProperty(key, Float.parseFloat(value));
break;
case "double":
message.setDoubleProperty(key, Double.parseDouble(value));
break;
case "string":
message.setStringProperty(key, value);
break;
default:
context.err.println("Unable to set property: " + key + ". Did not recognize type: " + type + ". Supported types are: boolean, int, long, byte, short, float, double, string.");
}
}
}

private String readInputStream(InputStream is, int size, long messageNumber) throws IOException {
try (InputStreamReader reader = new InputStreamReader(is)) {
char[] buffer;
Expand Down Expand Up @@ -333,6 +381,11 @@ public ProducerThread setMessage(String message) {
return this;
}

public ProducerThread setProperties(String properties) {
this.properties = properties;
return this;
}

public boolean isRunIndefinitely() {
return runIndefinitely;
}
Expand Down

0 comments on commit 6899a61

Please sign in to comment.