Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ public boolean addAddressInfo(AddressInfo addressInfo) throws Exception {

private boolean internalAddressInfo(AddressInfo addressInfo, boolean reload) throws Exception {
synchronized (addressLock) {
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeAddAddress(addressInfo, reload) : null);
boolean result;
if (reload) {
result = addressManager.reloadAddressInfo(addressInfo);
Expand All @@ -445,6 +446,7 @@ private boolean internalAddressInfo(AddressInfo addressInfo, boolean reload) thr
if (!addressInfo.isInternal()) {
managementService.registerAddress(addressInfo);
}
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterAddAddress(addressInfo, reload) : null);
} catch (Exception e) {
e.printStackTrace();
}
Expand Down Expand Up @@ -517,23 +519,28 @@ public QueueBinding updateQueue(SimpleString name,
@Override
public AddressInfo updateAddressInfo(SimpleString addressName,
EnumSet<RoutingType> routingTypes) throws Exception {

synchronized (addressLock) {
return addressManager.updateAddressInfo(addressName, routingTypes);
}

server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeUpdateAddress(addressName, routingTypes) : null);
final AddressInfo address = addressManager.updateAddressInfo(addressName, routingTypes);
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterUpdateAddress(address) : null);

return address;
}
}

@Override
public AddressInfo removeAddressInfo(SimpleString address) throws Exception {
synchronized (addressLock) {
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.beforeRemoveAddress(address) : null);
Bindings bindingsForAddress = getBindingsForAddress(address);
if (bindingsForAddress.getBindings().size() > 0) {
throw ActiveMQMessageBundle.BUNDLE.addressHasBindings(address);
}
managementService.unregisterAddress(address);
return addressManager.removeAddressInfo(address);
final AddressInfo addressInfo = addressManager.removeAddressInfo(address);
server.callBrokerPlugins(server.hasBrokerPlugins() ? plugin -> plugin.afterRemoveAddress(address, addressInfo) : null);

return addressInfo;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1860,6 +1860,8 @@ public void destroyQueue(final SimpleString queueName,

queue.deleteQueue(removeConsumers);

callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterDestroyQueue(queue, address, session, checkConsumerCount,
removeConsumers, autoDeleteAddress) : null);
AddressInfo addressInfo = getAddressInfo(address);

if (autoDeleteAddress && postOffice != null && addressInfo != null && addressInfo.isAutoCreated()) {
Expand All @@ -1871,9 +1873,6 @@ public void destroyQueue(final SimpleString queueName,
}

callPostQueueDeletionCallbacks(address, queueName);

callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterDestroyQueue(queue, address, session, checkConsumerCount,
removeConsumers, autoDeleteAddress) : null);
}

@Override
Expand Down Expand Up @@ -2777,10 +2776,10 @@ public Queue createQueue(final AddressInfo addrInfo,
managementService.registerQueue(queue, queue.getAddress(), storageManager);
}

callPostQueueCreationCallbacks(queue.getName());

callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterCreateQueue(queue) : null);

callPostQueueCreationCallbacks(queue.getName());

return queue;
}

Expand Down Expand Up @@ -2882,10 +2881,10 @@ public Queue createQueue(final SimpleString address,

managementService.registerQueue(queue, queue.getAddress(), storageManager);

callPostQueueCreationCallbacks(queue.getName());

callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterCreateQueue(queue) : null);

callPostQueueCreationCallbacks(queue.getName());

return queue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.activemq.artemis.core.server.plugin;

import java.util.EnumSet;
import java.util.Map;

import org.apache.activemq.artemis.api.core.ActiveMQException;
Expand All @@ -36,6 +37,7 @@
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
Expand Down Expand Up @@ -213,6 +215,71 @@ default void afterCloseConsumer(ServerConsumer consumer, boolean failed) throws

}

/**
* Before an address is added tot he broker
*
* @param addressInfo The addressInfo that will be added
* @param reload If the address is being reloaded
* @throws ActiveMQException
*/
default void beforeAddAddress(AddressInfo addressInfo, boolean reload) throws ActiveMQException {

}

/**
* After an address has been added tot he broker
*
* @param addressInfo The newly added address
* @param reload If the address is being reloaded
* @throws ActiveMQException
*/
default void afterAddAddress(AddressInfo addressInfo, boolean reload) throws ActiveMQException {

}


/**
* Before an address is updated
*
* @param address The existing address info that is about to be updated
* @param routingTypes The new routing types that the address will be updated with
* @throws ActiveMQException
*/
default void beforeUpdateAddress(SimpleString address, EnumSet<RoutingType> routingTypes) throws ActiveMQException {

}

/**
* After an address has been updated
*
* @param addressInfo The newly updated address info
* @throws ActiveMQException
*/
default void afterUpdateAddress(AddressInfo addressInfo) throws ActiveMQException {

}

/**
* Before an address is removed
*
* @param address The address that will be removed
* @throws ActiveMQException
*/
default void beforeRemoveAddress(SimpleString address) throws ActiveMQException {

}

/**
* After an address has been removed
*
* @param address The address that has been removed
* @param addressInfo The address info that has been removed or null if not removed
* @throws ActiveMQException
*/
default void afterRemoveAddress(SimpleString address, AddressInfo addressInfo) throws ActiveMQException {

}

/**
* Before a queue is created
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.plugin;

import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_ADD_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION;
Expand All @@ -26,7 +27,9 @@
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DEPLOY_BRIDGE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_REMOVE_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_ADD_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER;
Expand All @@ -35,6 +38,7 @@
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DELIVER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_REMOVE_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED;
Expand Down Expand Up @@ -100,7 +104,23 @@ public void testQueueReceiverReadAndAckMessage() throws Exception {
BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION, AFTER_CLOSE_SESSION,
BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER,
BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND,
AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, AFTER_DELIVER);
AFTER_SEND, BEFORE_MESSAGE_ROUTE, AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, AFTER_DELIVER,
BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS);
}

@Test(timeout = 60000)
public void testQueueReceiverAutoCreatedQueue() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();

AmqpReceiver receiver = session.createReceiver("autoCreated");
receiver.close();
connection.close();

verifier.validatePluginMethodsAtLeast(1, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS);
//should fire once for the auto created address being removed
verifier.validatePluginMethodsEquals(1, BEFORE_REMOVE_ADDRESS, AFTER_REMOVE_ADDRESS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.plugin;

import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_ADD_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CREATE_CONNECTION;
Expand All @@ -27,8 +28,11 @@
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_CONNECTION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_DESTROY_QUEUE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_MESSAGE_ROUTE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_REMOVE_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SEND;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_SESSION_METADATA_ADDED;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_UPDATE_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_ADD_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_CONSUMER;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CLOSE_SESSION;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_CREATE_CONSUMER;
Expand All @@ -38,8 +42,10 @@
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DEPLOY_BRIDGE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_DESTROY_QUEUE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_MESSAGE_ROUTE;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_REMOVE_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SEND;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_SESSION_METADATA_ADDED;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.BEFORE_UPDATE_ADDRESS;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_ACKED;
import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.MESSAGE_EXPIRED;

Expand All @@ -54,7 +60,9 @@
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
Expand All @@ -65,6 +73,7 @@
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.junit.Before;
Expand Down Expand Up @@ -116,11 +125,12 @@ public void testSendReceive() throws Exception {
conn.close();

verifier.validatePluginMethodsEquals(0, MESSAGE_EXPIRED, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE,
BEFORE_DESTROY_QUEUE, AFTER_DESTROY_QUEUE);
BEFORE_DESTROY_QUEUE, AFTER_DESTROY_QUEUE, BEFORE_UPDATE_ADDRESS, AFTER_UPDATE_ADDRESS,
BEFORE_REMOVE_ADDRESS, AFTER_REMOVE_ADDRESS);
verifier.validatePluginMethodsEquals(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION,
BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER,
BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE,
AFTER_MESSAGE_ROUTE);
AFTER_MESSAGE_ROUTE, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS);
verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION,
AFTER_CLOSE_SESSION);

Expand All @@ -139,7 +149,42 @@ public void testDestroyQueue() throws Exception {
server.destroyQueue(new SimpleString(queue.getQueueName()));

verifier.validatePluginMethodsEquals(1, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, BEFORE_DESTROY_QUEUE,
AFTER_DESTROY_QUEUE);
AFTER_DESTROY_QUEUE, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS);
}

@Test
public void testAutoCreateQueue() throws Exception {
conn = cf.createConnection();
conn.start();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue autoCreatedQueue = sess.createQueue("autoCreatedQueue");
sess.createConsumer(autoCreatedQueue);
conn.close();

verifier.validatePluginMethodsEquals(1, BEFORE_DESTROY_QUEUE,
AFTER_DESTROY_QUEUE, BEFORE_REMOVE_ADDRESS, AFTER_REMOVE_ADDRESS);

verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, BEFORE_ADD_ADDRESS,
AFTER_ADD_ADDRESS);
}

@Test
public void testAutoCreateTopic() throws Exception {
conn = cf.createConnection();
conn.start();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic autoCreatedTopic = sess.createTopic("autoCreatedTopic");
sess.createConsumer(autoCreatedTopic);
conn.close();

//before/add address called just once to remove autocreated destination
verifier.validatePluginMethodsEquals(1, BEFORE_DESTROY_QUEUE,
AFTER_DESTROY_QUEUE, BEFORE_REMOVE_ADDRESS, AFTER_REMOVE_ADDRESS);

//Before/Add address are called twice because of the autocreated destination and the
//created destination in the before method
verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, BEFORE_ADD_ADDRESS,
AFTER_ADD_ADDRESS);
}

@Test
Expand All @@ -162,12 +207,13 @@ public void testMessageExpireServer() throws Exception {

conn.close();

verifier.validatePluginMethodsEquals(0, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE);
verifier.validatePluginMethodsEquals(0, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE,
BEFORE_REMOVE_ADDRESS, AFTER_REMOVE_ADDRESS);
verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION,
BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER,
BEFORE_SESSION_METADATA_ADDED, AFTER_SESSION_METADATA_ADDED,
BEFORE_SESSION_METADATA_ADDED, AFTER_SESSION_METADATA_ADDED, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS,
BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE,
AFTER_MESSAGE_ROUTE, MESSAGE_EXPIRED);
AFTER_MESSAGE_ROUTE, MESSAGE_EXPIRED, BEFORE_ADD_ADDRESS, AFTER_ADD_ADDRESS);
verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION,
AFTER_CLOSE_SESSION);

Expand All @@ -194,11 +240,13 @@ public void testMessageExpireClient() throws Exception {

conn.close();

verifier.validatePluginMethodsEquals(0, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE);
verifier.validatePluginMethodsEquals(0, BEFORE_DEPLOY_BRIDGE, AFTER_DEPLOY_BRIDGE,
BEFORE_REMOVE_ADDRESS, AFTER_REMOVE_ADDRESS);
verifier.validatePluginMethodsAtLeast(1, AFTER_CREATE_CONNECTION, AFTER_DESTROY_CONNECTION,
BEFORE_CREATE_CONSUMER, AFTER_CREATE_CONSUMER, BEFORE_CLOSE_CONSUMER, AFTER_CLOSE_CONSUMER,
BEFORE_CREATE_QUEUE, AFTER_CREATE_QUEUE, MESSAGE_ACKED, BEFORE_SEND, AFTER_SEND, BEFORE_MESSAGE_ROUTE,
AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, AFTER_DELIVER, MESSAGE_EXPIRED);
AFTER_MESSAGE_ROUTE, BEFORE_DELIVER, AFTER_DELIVER, MESSAGE_EXPIRED, BEFORE_ADD_ADDRESS,
AFTER_ADD_ADDRESS);
verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION,
AFTER_CLOSE_SESSION);

Expand Down Expand Up @@ -257,6 +305,15 @@ public void testSimpleBridge() throws Exception {

}


@Test
public void testUpdateAddress() throws Exception {
server.addOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString("test"), RoutingType.ANYCAST));
server.addOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString("test"), RoutingType.MULTICAST));

verifier.validatePluginMethodsEquals(1, BEFORE_UPDATE_ADDRESS, AFTER_UPDATE_ADDRESS);
}

private class ExpiredPluginVerifier implements ActiveMQServerPlugin {

@Override
Expand Down
Loading