Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,15 @@ public void sendAcknowledged(final Message message) {
}

/* Hook for processing message before forwarding */
protected Message beforeForward(final Message message, final SimpleString forwardingAddress) {
protected Message beforeForward(Message message, final SimpleString forwardingAddress) {
message = message.copy();

return beforeForwardingNoCopy(message, forwardingAddress);
}

/** ClusterConnectionBridge already makes a copy of the message.
* So I needed I hook where the message is not copied. */
protected Message beforeForwardingNoCopy(Message message, SimpleString forwardingAddress) {
if (useDuplicateDetection) {
// We keep our own DuplicateID for the Bridge, so bouncing back and forth will work fine
byte[] bytes = getDuplicateBytes(nodeUUID, message.getMessageID());
Expand All @@ -577,6 +585,8 @@ protected Message beforeForward(final Message message, final SimpleString forwar
break;
}

message.messageChanged();

if (transformer != null) {
final Message transformedMessage = transformer.transform(message);
if (transformedMessage != message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ protected Message beforeForward(final Message message, final SimpleString forwar

messageCopy.putExtraBytesProperty(Message.HDR_ROUTE_TO_IDS, queueIds);

messageCopy = super.beforeForward(messageCopy, forwardingAddress);
messageCopy = super.beforeForwardingNoCopy(messageCopy, forwardingAddress);

return messageCopy;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ public void route(final Message message, final RoutingContext context) throws Ex
if (transformer != null) {
copy = transformer.transform(copy);
}

copy.messageChanged();
} else {
copy = message;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.activemq.artemis.tests.integration.amqp;

import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

public class AmqpBridgeClusterRedistributionTest extends AmqpClientTestSupport {

protected ActiveMQServer[] servers = new ActiveMQServer[3];
private ActiveMQServer server0;
private ActiveMQServer server1;
private ActiveMQServer server2;
private SimpleString customNotificationQueue;
private SimpleString frameworkNotificationsQueue;
private SimpleString bridgeNotificationsQueue;
private SimpleString notificationsQueue;

private String getServer0URL() {
return "tcp://localhost:61616";
}

private String getServer1URL() {
return "tcp://localhost:61617";
}

private String getServer2URL() {
return "tcp://localhost:61618";
}

@Override
public URI getBrokerAmqpConnectionURI() {
try {
return new URI(getServer0URL());
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}

@Override
protected ActiveMQServer createServer(final boolean realFiles,
final Configuration configuration,
final long pageSize,
final long maxAddressSize,
final Map<String, AddressSettings> settings) {
ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(configuration, realFiles));

if (settings != null) {
for (Map.Entry<String, AddressSettings> setting : settings.entrySet()) {
server.getAddressSettingsRepository().addMatch(setting.getKey(), setting.getValue());
}
}

AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(pageSize).setMaxSizeBytes(maxAddressSize).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setRedeliveryDelay(0).setRedistributionDelay(0).setAutoCreateQueues(true).setAutoCreateAddresses(true);

server.getAddressSettingsRepository().addMatch("#", defaultSetting);

return server;
}

@Override
@Before
public void setUp() throws Exception {
super.setUp();
server0 = createServer(false, createBasicConfig());
server1 = createServer(false, createBasicConfig());
server2 = createServer(false, createBasicConfig());

servers[0] = server0;
servers[1] = server1;
servers[2] = server2;

server0.getConfiguration().addAcceptorConfiguration("acceptor", getServer0URL());
server0.getConfiguration().addConnectorConfiguration("notification-broker", getServer1URL());

server1.getConfiguration().addAcceptorConfiguration("acceptor", getServer1URL());
server2.getConfiguration().addAcceptorConfiguration("acceptor", getServer2URL());

DivertConfiguration customNotificationsDivert = new DivertConfiguration().setName("custom-notifications-divert").setAddress("*.Provider.*.Agent.*.CustomNotification").setForwardingAddress("FrameworkNotifications").setExclusive(true);

DivertConfiguration frameworkNotificationsDivertServer1 = new DivertConfiguration().setName("framework-notifications-divert").setAddress("BridgeNotifications").setForwardingAddress("Notifications").setRoutingType(ComponentConfigurationRoutingType.MULTICAST).setExclusive(true);
DivertConfiguration frameworkNotificationsDivertServer2 = new DivertConfiguration().setName("framework-notifications-divert").setAddress("BridgeNotifications").setForwardingAddress("Notifications").setRoutingType(ComponentConfigurationRoutingType.MULTICAST).setExclusive(true);

server0.getConfiguration().addDivertConfiguration(customNotificationsDivert);

server1.getConfiguration().addDivertConfiguration(frameworkNotificationsDivertServer1);
server2.getConfiguration().addDivertConfiguration(frameworkNotificationsDivertServer2);

customNotificationQueue = SimpleString.toSimpleString("*.Provider.*.Agent.*.CustomNotification");
frameworkNotificationsQueue = SimpleString.toSimpleString("FrameworkNotifications");
bridgeNotificationsQueue = SimpleString.toSimpleString("BridgeNotifications");
notificationsQueue = SimpleString.toSimpleString("Notifications");

setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, true, 1, 2);
setupClusterConnection("cluster1", "", MessageLoadBalancingType.ON_DEMAND, 1, true, 2, 1);

server0.start();

server1.start();
server2.start();

server0.createQueue(customNotificationQueue, RoutingType.ANYCAST, customNotificationQueue, null, true, false);
server0.createQueue(frameworkNotificationsQueue, RoutingType.ANYCAST, frameworkNotificationsQueue, null, true, false);

server1.createQueue(bridgeNotificationsQueue, RoutingType.ANYCAST, bridgeNotificationsQueue, null, true, false);
server1.createQueue(notificationsQueue, RoutingType.MULTICAST, notificationsQueue, null, true, false);

server2.createQueue(bridgeNotificationsQueue, RoutingType.ANYCAST, bridgeNotificationsQueue, null, true, false);
server2.createQueue(notificationsQueue, RoutingType.MULTICAST, notificationsQueue, null, true, false);

server0.deployBridge(new BridgeConfiguration().setName("notifications-bridge").setQueueName(frameworkNotificationsQueue.toString()).setForwardingAddress(bridgeNotificationsQueue.toString()).setConfirmationWindowSize(10).setStaticConnectors(Arrays.asList("notification-broker")));
}

@After
@Override
public void tearDown() throws Exception {
try {
if (server0 != null) {
server0.stop();
}
if (server1 != null) {
server1.stop();
}
if (server2 != null) {
server2.stop();
}
} finally {
super.tearDown();
}
}

@Test
public void testSendMessageToBroker0GetFromBroker1() throws Exception {
try (ServerLocator locator = ActiveMQClient.createServerLocator(getServer1URL()); ClientSessionFactory sessionFactory = locator.createSessionFactory(); ClientSession session = sessionFactory.createSession(); ClientConsumer consumer = session.createConsumer(notificationsQueue)) {

session.start();

sendMessages("uswest.Provider.AMC.Agent.DIVERTED.CustomNotification", 1, RoutingType.ANYCAST, true);

ClientMessage message = consumer.receive(5000);
assertNotNull(message);

message = consumer.receiveImmediate();
assertNull(message);
}
}

@Test
public void testSendMessageToBroker0GetFromBroker2() throws Exception {
try (ServerLocator locator = ActiveMQClient.createServerLocator(getServer2URL()); ClientSessionFactory sessionFactory = locator.createSessionFactory(); ClientSession session = sessionFactory.createSession(); ClientConsumer consumer = session.createConsumer(notificationsQueue)) {

session.start();

sendMessages("uswest.Provider.AMC.Agent.DIVERTED.CustomNotification", 1, RoutingType.ANYCAST, true);

ClientMessage message = consumer.receive(5000);
assertNotNull(message);

message = consumer.receiveImmediate();
assertNull(message);
}
}

protected void setupClusterConnection(final String name,
final String address,
final MessageLoadBalancingType messageLoadBalancingType,
final int maxHops,
final boolean netty,
final int nodeFrom,
final int... nodesTo) {
setupClusterConnection(name, address, messageLoadBalancingType, maxHops, netty, null, nodeFrom, nodesTo);
}

protected void setupClusterConnection(final String name,
final String address,
final MessageLoadBalancingType messageLoadBalancingType,
final int maxHops,
final boolean netty,
final ClusterTestBase.ClusterConfigCallback cb,
final int nodeFrom,
final int... nodesTo) {
ActiveMQServer serverFrom = servers[nodeFrom];

if (serverFrom == null) {
throw new IllegalStateException("No server at node " + nodeFrom);
}

TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(), connectorFrom);

List<String> pairs = getClusterConnectionTCNames(netty, serverFrom, nodesTo);
Configuration config = serverFrom.getConfiguration();
ClusterConnectionConfiguration clusterConf = createClusterConfig(name, address, messageLoadBalancingType, maxHops, connectorFrom, pairs);

if (cb != null) {
cb.configure(clusterConf);
}
config.getClusterConfigurations().add(clusterConf);
}

private List<String> getClusterConnectionTCNames(boolean netty, ActiveMQServer serverFrom, int[] nodesTo) {
List<String> pairs = new ArrayList<>();
for (int element : nodesTo) {
TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(element, netty));
serverFrom.getConfiguration().getConnectorConfigurations().put(serverTotc.getName(), serverTotc);
pairs.add(serverTotc.getName());
}
return pairs;
}

private ClusterConnectionConfiguration createClusterConfig(final String name,
final String address,
final MessageLoadBalancingType messageLoadBalancingType,
final int maxHops,
TransportConfiguration connectorFrom,
List<String> pairs) {
return new ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(connectorFrom.getName()).setRetryInterval(250).setMessageLoadBalancingType(messageLoadBalancingType).setMaxHops(maxHops).setConfirmationWindowSize(1024).setStaticConnectors(pairs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,7 @@ public void testWithDuplicates() throws Exception {

final String testAddress = "testAddress";
final String queueName0 = "queue0";
final String secondQueue = "queue1";
final String forwardAddress = "forwardAddress";
final String queueName1 = "forwardQueue";

Expand All @@ -827,6 +828,8 @@ public void testWithDuplicates() throws Exception {
CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration().setAddress(testAddress).setName(queueName0);
List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<>();
queueConfigs0.add(queueConfig0);
queueConfig0 = new CoreQueueConfiguration().setAddress(testAddress).setName(secondQueue);
queueConfigs0.add(queueConfig0);
server0.getConfiguration().setQueueConfigurations(queueConfigs0);

server0.start();
Expand Down Expand Up @@ -882,7 +885,8 @@ public void testWithDuplicates() throws Exception {
tx.commit();
}

Thread.sleep(1000);
Thread.sleep(100);


ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);

Expand All @@ -909,6 +913,20 @@ public void testWithDuplicates() throws Exception {

Assert.assertNull(consumer1.receiveImmediate());

ClientConsumer otherConsumer = session0.createConsumer(secondQueue);
session0.start();
for (int i = 0; i < numMessages; i++) {
ClientMessage message = otherConsumer.receive(5000);
Assert.assertNotNull(message);
// This is validating the Bridge is not messing up with the original message
// and should make a copy of the message before sending it
Assert.assertEquals(2, message.getPropertyNames().size());
Assert.assertEquals(i, message.getIntProperty(propKey).intValue());
Assert.assertEquals(new SimpleString("monkey" + i), message.getSimpleStringProperty(selectorKey));
message.acknowledge();

}

consumer1.close();

session1.deleteQueue(queueName1);
Expand Down