Skip to content
Permalink
Browse files
ARTEMIS-3827 anon OpenWire producer may lose sent msg
  • Loading branch information
AntonRoskvist authored and jbertram committed May 23, 2022
1 parent 5ca0d72 commit 548747c71dbe5defdad2af057abaffe5747eb8fa
Showing 3 changed files with 62 additions and 1 deletion.
@@ -368,7 +368,7 @@ private void act(Command command) {
setLastCommand(command);
response = command.visit(commandProcessorInstance);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.warn("Errors occurred during the buffering operation ", e);
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
if (responseRequired) {
response = convertException(e);
}
@@ -1683,6 +1683,9 @@ public Response processMessage(Message messageSend) throws Exception {
} catch (Exception e) {
if (tx != null) {
tx.markAsRollbackOnly(new ActiveMQException(e.getMessage()));
} else if (e instanceof ActiveMQNonExistentQueueException && producerInfo.getDestination() == null) {
//Send exception for non transacted anonymous producers using an incorrect destination
sendException(e);
}
throw e;
} finally {
@@ -39,6 +39,7 @@
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
@@ -250,6 +251,10 @@ private boolean checkAutoCreateQueue(SimpleString queueName, boolean isTemporary
coreSession.createQueue(new QueueConfiguration(queueNameToUse).setAddress(addressToUse).setRoutingType(routingTypeToUse).setTemporary(isTemporary).setAutoCreated(true).setFilterString(filter));
connection.addKnownDestination(queueName);
} else {
if (server.getAddressInfo(queueName) == null) {
//Address does not exist and will not get autocreated
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(queueName);
}
hasQueue = false;
}
}
@@ -21,8 +21,11 @@
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.command.ActiveMQDestination;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import javax.jms.Connection;
import javax.jms.MessageConsumer;
@@ -144,4 +147,54 @@ public void testAutoDeleteNegative() throws Exception {
Wait.assertTrue(() -> server.locateQueue(new SimpleString("trash")) != null);
Wait.assertTrue(() -> server.getAddressInfo(new SimpleString("trash")) != null);
}

@Rule
public ExpectedException thrown = ExpectedException.none();

@Test()
public void testSendFailsWithoutAutoCreate() throws Exception {
thrown.expect(javax.jms.JMSException.class);

Connection connection = null;
try {
AddressSettings setting = new AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false);
server.getAddressSettingsRepository().addMatch("WRONG.#", setting);

connection = factory.createConnection("admin", "password");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQDestination destination = ActiveMQDestination.createDestination("WRONG.QUEUE", ActiveMQDestination.QUEUE_TYPE);

final MessageProducer producer = session.createProducer(null);
producer.send(destination, session.createTextMessage("foo"));

} finally {
if (connection != null) {
connection.close();
}
}
}

@Test()
public void testTransactedSendFailsWithoutAutoCreate() throws Exception {
thrown.expect(javax.jms.JMSException.class);

Connection connection = null;
try {
AddressSettings setting = new AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false);
server.getAddressSettingsRepository().addMatch("WRONG.#", setting);

connection = factory.createConnection("admin", "password");
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
ActiveMQDestination destination = ActiveMQDestination.createDestination("WRONG.QUEUE", ActiveMQDestination.QUEUE_TYPE);

final MessageProducer producer = session.createProducer(null);
producer.send(destination, session.createTextMessage("foo"));
session.commit();

} finally {
if (connection != null) {
connection.close();
}
}
}
}

0 comments on commit 548747c

Please sign in to comment.