Skip to content

Commit

Permalink
ARTEMIS-4648 support typed properties from CLI producer
Browse files Browse the repository at this point in the history
  • Loading branch information
jbertram committed Feb 29, 2024
1 parent b10e752 commit 83dff1b
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public class Producer extends DestAbstract {
@Option(names = "--data", description = "Messages will be read from the specified file. Other message options will be ignored.")
String file = null;

@Option(names = "--properties", description = "The properties to set on the message in JSON, e.g.: [{\"type\":\"string\",\"key\":\"myKey1\",\"value\":\"myValue1\"},{\"type\":\"string\",\"key\":\"myKey2\",\"value\":\"myValue2\"}]. Valid types are boolean, byte, short, int, long, float, double, and string.")
String properties = null;

public boolean isNonpersistent() {
return nonpersistent;
}
Expand Down Expand Up @@ -88,6 +91,15 @@ public Producer setMessage(String message) {
return this;
}

public String getProperties() {
return properties;
}

public Producer setProperties(String properties) {
this.properties = properties;
return this;
}

public int getTextMessageSize() {
return textMessageSize;
}
Expand Down Expand Up @@ -205,6 +217,7 @@ public Object execute(ActionContext context) throws Exception {
.setMessageSize(messageSize)
.setTextMessageSize(textMessageSize)
.setMessage(message)
.setProperties(properties)
.setObjectSize(objectSize)
.setMsgTTL(msgTTL)
.setMsgGroupID(msgGroupID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.StringReader;
import java.net.URL;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.activemq.artemis.json.JsonArray;
import org.apache.activemq.artemis.json.JsonObject;
import org.apache.activemq.artemis.utils.JsonLoader;
import org.apache.activemq.artemis.utils.ReusableLatch;

public class ProducerThread extends Thread {
Expand All @@ -52,6 +56,7 @@ public class ProducerThread extends Thread {
int transactions = 0;
final AtomicLong sentCount = new AtomicLong(0);
String message = null;
String properties = null;
String messageText = null;
String payloadUrl = null;
byte[] payload = null;
Expand Down Expand Up @@ -190,6 +195,45 @@ protected Message createMessage(long i, String threadName) throws Exception {

answer.setLongProperty("count", i);
answer.setStringProperty("ThreadSent", threadName);

if (properties != null && properties.length() != 0) {
JsonArray propertyArray = JsonLoader.readArray(new StringReader(properties));
for (int j = 0; j < propertyArray.size(); j++) {
JsonObject propertyEntry = propertyArray.getJsonObject(j);
String type = propertyEntry.getString("type");
String key = propertyEntry.getString("key");
String value = propertyEntry.getString("value");
switch (type.toLowerCase()) {
case "boolean":
answer.setBooleanProperty(key, Boolean.parseBoolean(value));
break;
case "int":
answer.setIntProperty(key, Integer.parseInt(value));
break;
case "long":
answer.setLongProperty(key, Long.parseLong(value));
break;
case "byte":
answer.setByteProperty(key, Byte.parseByte(value));
break;
case "short":
answer.setShortProperty(key, Short.parseShort(value));
break;
case "float":
answer.setFloatProperty(key, Float.parseFloat(value));
break;
case "double":
answer.setDoubleProperty(key, Double.parseDouble(value));
break;
case "string":
answer.setStringProperty(key, value);
break;
default:
System.err.println("Unable to set property: " + key + ". Did not recognize type: " + type + ". Supported types are: boolean, int, long, byte, short, float, double, string.");
}
}
}

return answer;
}

Expand Down Expand Up @@ -330,6 +374,11 @@ public ProducerThread setMessage(String message) {
return this;
}

public ProducerThread setProperties(String properties) {
this.properties = properties;
return this;
}

public boolean isRunIndefinitely() {
return runIndefinitely;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,22 @@
*/
package org.apache.activemq.cli.test;

import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.List;

import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.cli.commands.messages.Producer;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.List;

import static org.junit.Assert.assertEquals;

public class CliProducerTest extends CliTestBase {
Expand All @@ -54,9 +56,10 @@ public void tearDown() throws Exception {
super.tearDown();
}

private void produceMessages(String address, String message, int msgCount) throws Exception {
new Producer()
private Object produceMessages(String address, String message, int msgCount, String properties) throws Exception {
return new Producer()
.setMessage(message)
.setProperties(properties)
.setMessageCount(msgCount)
.setDestination(address)
.setUser("admin")
Expand All @@ -65,13 +68,13 @@ private void produceMessages(String address, String message, int msgCount) throw
}

private void produceMessages(String address, int msgCount) throws Exception {
produceMessages(address, null, msgCount);
produceMessages(address, null, msgCount, null);
}

private void checkSentMessages(Session session, String address, String messageBody) throws Exception {
final boolean isCustomMessageBody = messageBody != null;

List<Message> received = consumeMessages(session, address, TEST_MESSAGE_COUNT, CompositeAddress.isFullyQualified(address));
List<Message> received = consumeMessages(session, address, TEST_MESSAGE_COUNT);
for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
if (!isCustomMessageBody) messageBody = "test message: " + String.valueOf(i);
assertEquals(messageBody, ((TextMessage) received.get(i)).getText());
Expand All @@ -88,6 +91,138 @@ public void testSendMessage() throws Exception {
checkSentMessages(session, address, null);
}

@Test
public void testBooleanMessageProperties() throws Exception {
Message m;
String address = "test";
Session session = createSession(connection);

assertEquals(1L, produceMessages(address, null, 1, "[{'type':'boolean','key':'myTrueBoolean','value':'true'},{'type':'boolean','key':'myFalseBoolean','value':'false'}]".replaceAll("'", "\"")));

m = consumeMessages(session, address, 1).get(0);
Assert.assertTrue(m.propertyExists("myTrueBoolean"));
Assert.assertTrue(m.getBooleanProperty("myTrueBoolean"));
Assert.assertTrue(m.propertyExists("myFalseBoolean"));
Assert.assertFalse(m.getBooleanProperty("myFalseBoolean"));
}

@Test
public void testIntMessageProperties() throws Exception {
Message m;
String address = "test";
Session session = createSession(connection);

assertEquals(1L, produceMessages(address, null, 1, ("[{'type':'int','key':'myInt','value':'" + Integer.MAX_VALUE + "'}]").replaceAll("'", "\"")));

m = consumeMessages(session, address, 1).get(0);
Assert.assertTrue(m.propertyExists("myInt"));
Assert.assertEquals(Integer.MAX_VALUE, m.getIntProperty("myInt"));

assertEquals(0L, produceMessages(address, null, 1, ("[{'type':'int','key':'myInt','value':'" + Integer.MAX_VALUE + 1 + "'}]").replaceAll("'", "\"")));
}

@Test
public void testLongMessageProperties() throws Exception {
Message m;
String address = "test";
Session session = createSession(connection);

assertEquals(1L, produceMessages(address, null, 1, ("[{'type':'long','key':'myLong','value':'" + Long.MAX_VALUE + "'}]").replaceAll("'", "\"")));

m = consumeMessages(session, address, 1).get(0);
Assert.assertTrue(m.propertyExists("myLong"));
Assert.assertEquals(Long.MAX_VALUE, m.getLongProperty("myLong"));

assertEquals(0L, produceMessages(address, null, 1, ("[{'type':'long','key':'myLong','value':'" + Long.MAX_VALUE + 1 + "'}]").replaceAll("'", "\"")));
}

@Test
public void testByteMessageProperties() throws Exception {
Message m;
String address = "test";
Session session = createSession(connection);

assertEquals(1L, produceMessages(address, null, 1, "[{'type':'byte','key':'myByte','value':'127'}]".replaceAll("'", "\"")));

m = consumeMessages(session, address, 1).get(0);
Assert.assertTrue(m.propertyExists("myByte"));
Assert.assertEquals((byte) 127, m.getByteProperty("myByte"));

assertEquals(0L, produceMessages(address, null, 1, "[{'type':'byte','key':'myByte','value':'128'}]".replaceAll("'", "\"")));
}

@Test
public void testShortMessageProperties() throws Exception {
Message m;
String address = "test";
Session session = createSession(connection);

assertEquals(1L, produceMessages(address, null, 1, ("[{'type':'short','key':'myShort','value':'" + Short.MAX_VALUE + "'}]").replaceAll("'", "\"")));

m = consumeMessages(session, address, 1).get(0);
Assert.assertTrue(m.propertyExists("myShort"));
Assert.assertEquals(Short.MAX_VALUE, m.getShortProperty("myShort"));

assertEquals(0L, produceMessages(address, null, 1, ("[{'type':'short','key':'myShort','value':'" + Short.MAX_VALUE + 1 + "'}]").replaceAll("'", "\"")));
}

@Test
public void testFloatMessageProperties() throws Exception {
Message m;
String address = "test";
Session session = createSession(connection);

assertEquals(1L, produceMessages(address, null, 1, ("[{'type':'float','key':'myFloat','value':'" + Float.MAX_VALUE + "'}]").replaceAll("'", "\"")));

m = consumeMessages(session, address, 1).get(0);
Assert.assertTrue(m.propertyExists("myFloat"));
Assert.assertEquals(Float.MAX_VALUE, m.getFloatProperty("myFloat"), 0.0);

assertEquals(0L, produceMessages(address, null, 1, ("[{'type':'float','key':'myFloat','value':'badFloat'}]").replaceAll("'", "\"")));
}

@Test
public void testDoubleMessageProperties() throws Exception {
Message m;
String address = "test";
Session session = createSession(connection);

assertEquals(1L, produceMessages(address, null, 1, ("[{'type':'double','key':'myDouble','value':'" + Double.MAX_VALUE + "'}]").replaceAll("'", "\"")));

m = consumeMessages(session, address, 1).get(0);
Assert.assertTrue(m.propertyExists("myDouble"));
Assert.assertEquals(Double.MAX_VALUE, m.getDoubleProperty("myDouble"), 0.0);

assertEquals(0L, produceMessages(address, null, 1, ("[{'type':'double','key':'myDouble','value':'badDouble'}]").replaceAll("'", "\"")));
}

@Test
public void testStringMessageProperties() throws Exception {
Message m;
String address = "test";
Session session = createSession(connection);

assertEquals(1L, produceMessages(address, null, 1, "[{'type':'string','key':'myString','value':'foo'}]".replaceAll("'", "\"")));

m = consumeMessages(session, address, 1).get(0);
Assert.assertTrue(m.propertyExists("myString"));
Assert.assertEquals("foo", m.getStringProperty("myString"));
}

@Test
public void testBadMessageProperties() throws Exception {
PrintStream originalErr = System.err;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream newErr = new PrintStream(baos);
System.setErr(newErr);
try {
produceMessages("test", null, 1, "[{'type':'myType','key':'myKey','value':'myValue'}]".replaceAll("'", "\""));
assertEquals("Unable to set property: myKey. Did not recognize type: myType. Supported types are: boolean, int, long, byte, short, float, double, string.\n", baos.toString());
} finally {
System.setErr(originalErr);
}
}

@Test
public void testSendMessageFQQN() throws Exception {
String address = "test";
Expand All @@ -112,7 +247,7 @@ public void testSendMessageCustomBodyFQQN() throws Exception {
createQueue(RoutingType.MULTICAST, address, queue);
Session session = createSession(connection);

produceMessages("topic://" + address, messageBody, TEST_MESSAGE_COUNT);
produceMessages("topic://" + address, messageBody, TEST_MESSAGE_COUNT, null);

checkSentMessages(session, fqqn, messageBody);
}
Expand All @@ -124,7 +259,7 @@ public void testSendMessageWithCustomBody() throws Exception {

Session session = createSession(connection);

produceMessages(address, messageBody, TEST_MESSAGE_COUNT);
produceMessages(address, messageBody, TEST_MESSAGE_COUNT, null);

checkSentMessages(session, address, messageBody);
}
Expand All @@ -136,7 +271,7 @@ public void testSendMessageWithCustomBodyLongString() throws Exception {

Session session = createSession(connection);

produceMessages(address, messageBody, TEST_MESSAGE_COUNT);
produceMessages(address, messageBody, TEST_MESSAGE_COUNT, null);

checkSentMessages(session, address, messageBody);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.spi.core.security.jaas.PropertiesLoader;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -142,9 +143,9 @@ protected Session createSession(Connection connection) throws JMSException {
return session;
}

protected List<Message> consumeMessages(Session session, String address, int noMessages, boolean fqqn) throws Exception {
protected List<Message> consumeMessages(Session session, String address, int noMessages) throws Exception {
List<Message> messages = new ArrayList<>();
Destination destination = fqqn ? session.createQueue(address) : getDestination(address);
Destination destination = CompositeAddress.isFullyQualified(address) ? session.createQueue(address) : getDestination(address);

try (MessageConsumer consumer = session.createConsumer(destination)) {
for (int i = 0; i < noMessages; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private void checkSentMessages(Session session, List<Message> messages, String a
}

private void checkSentMessages(Session session, List<Message> messages, String address, String key) throws Exception {
List<Message> received = consumeMessages(session, address, TEST_MESSAGE_COUNT, CompositeAddress.isFullyQualified(address));
List<Message> received = consumeMessages(session, address, TEST_MESSAGE_COUNT);
for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
Message m = messages.get(i);
if (m instanceof TextMessage) {
Expand Down

0 comments on commit 83dff1b

Please sign in to comment.