Skip to content

Commit

Permalink
ARTEMIS-4196 - set message routing type in mqtt publish
Browse files Browse the repository at this point in the history
  • Loading branch information
gtully authored and jbertram committed Mar 8, 2023
1 parent ed5322c commit 00cae02
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerProducer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.UUIDGenerator;
Expand Down Expand Up @@ -223,8 +224,13 @@ void sendToQueue(MqttPublishMessage message, boolean internal) throws Exception

Transaction tx = session.getServerSession().newTransaction();
try {
if (session.getServer().getAddressInfo(address) == null && session.getServer().getAddressSettingsRepository().getMatch(coreAddress).isAutoCreateAddresses()) {
AddressInfo addressInfo = session.getServer().getAddressInfo(address);
if (addressInfo == null && session.getServer().getAddressSettingsRepository().getMatch(coreAddress).isAutoCreateAddresses()) {
session.getServerSession().createAddress(address, RoutingType.MULTICAST, true);
serverMessage.setRoutingType(RoutingType.MULTICAST);
}
if (addressInfo != null) {
serverMessage.setRoutingType(addressInfo.getRoutingType());
}
session.getServerSession().send(tx, serverMessage, true, senderName, false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,8 +416,10 @@ private Binding getNextBinding(final Message message,
private static boolean matchBinding(final Message message,
final Binding binding,
final MessageLoadBalancingType loadBalancingType) {
if ((loadBalancingType.equals(MessageLoadBalancingType.OFF) || loadBalancingType.equals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION) && !Objects.equals(message.getRoutingType(), RoutingType.MULTICAST)) && binding instanceof RemoteQueueBinding) {
return false;
if (loadBalancingType.equals(MessageLoadBalancingType.OFF) || loadBalancingType.equals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION)) {
if (!Objects.equals(message.getRoutingType(), RoutingType.MULTICAST) && binding instanceof RemoteQueueBinding) {
return false;
}
}

final Filter filter = binding.getFilter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1350,8 +1350,8 @@ public void doTestSendMQTTReceiveJMS(String jmsTopicAddress, String mqttAddress)
connection.start();

Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue jmsQueue = s.createQueue(jmsTopicAddress);
MessageConsumer consumer = s.createConsumer(jmsQueue);
javax.jms.Topic jmsTopic = s.createTopic(jmsTopicAddress);
MessageConsumer consumer = s.createConsumer(jmsTopic);

provider.publish(address, RETAINED.getBytes(), AT_LEAST_ONCE, true);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.mqtt;

import java.util.concurrent.TimeUnit;

import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.junit.Test;

public class MqttClusterRemoteSubscribeLoadBalanceOffTest extends ClusterTestBase {

@Override
protected boolean isResolveProtocols() {
return true;
}

public boolean isNetty() {
return true;
}

@Test
public void testPub0Sub1() throws Exception {
final String TOPIC = "test/1";
final String clientId1 = "clientId1";
final String clientId2 = "clientId2";
Topic[] topics = {new Topic(TOPIC, QoS.AT_MOST_ONCE)};

setupServers(TOPIC);

startServers(0, 1);

final BlockingConnection connection1 = retrieveMQTTConnection("tcp://localhost:61616", clientId1);
final BlockingConnection connection2 = retrieveMQTTConnection("tcp://localhost:61617", clientId2);

assertTrue("Should be connected", Wait.waitFor(() -> connection1.isConnected(), 5000, 100));
assertTrue("Should be connected", Wait.waitFor(() -> connection2.isConnected(), 5000, 100));

waitForTopology(servers[0], "cluster0", 2, 5000);
waitForTopology(servers[1], "cluster1", 2, 5000);

// Subscribe to topics
connection1.subscribe(topics);
connection2.subscribe(topics);


waitForBindings(0, TOPIC, 1, 1, false);
waitForBindings(1, TOPIC, 1, 1, false);

// Publish Messages
String payload1 = "This is message 1";
String payload2 = "This is message 2";

connection1.publish(TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, false);
connection2.publish(TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, false);

Message message1 = connection1.receive(5, TimeUnit.SECONDS);
message1.ack();
Message message2 = connection1.receive(5, TimeUnit.SECONDS);
message2.ack();

message1 = connection2.receive(5, TimeUnit.SECONDS);
message1.ack();
message2 = connection2.receive(5, TimeUnit.SECONDS);
message2.ack();

String[] topicsStrings = new String[]{TOPIC};
if (connection1 != null && connection1.isConnected()) {
connection1.unsubscribe(topicsStrings);
connection1.disconnect();
}
if (connection2 != null && connection2.isConnected()) {
connection2.unsubscribe(topicsStrings);
connection2.disconnect();
}
}

private static BlockingConnection retrieveMQTTConnection(String host, String clientId) throws Exception {
MQTT mqtt = new MQTT();
mqtt.setHost(host);
mqtt.setClientId(clientId);
mqtt.setConnectAttemptsMax(0);
mqtt.setReconnectAttemptsMax(0);
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
return connection;
}

private void setupServers(String address) throws Exception {

WildcardConfiguration wildcardConfiguration = createWildCardConfiguration();
CoreAddressConfiguration coreAddressConfiguration = createAddressConfiguration(address);
AddressSettings addressSettings = createAddressSettings();

setupServer(0, false, isNetty());
servers[0].getConfiguration().addAddressConfiguration(coreAddressConfiguration);
servers[0].getConfiguration().addAddressSetting("#", addressSettings);
servers[0].getConfiguration().setWildCardConfiguration(wildcardConfiguration);

setupServer(1, false, isNetty());
servers[1].getConfiguration().addAddressConfiguration(coreAddressConfiguration);
servers[1].getConfiguration().addAddressSetting("#", addressSettings);
servers[1].getConfiguration().setWildCardConfiguration(wildcardConfiguration);

setupClusterConnection("cluster0", "", MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION, 1, isNetty(), 0, 1);
setupClusterConnection("cluster1", "", MessageLoadBalancingType.OFF, 1, isNetty(), 1, 0);
}

private AddressSettings createAddressSettings() {
AddressSettings addressSettings = new AddressSettings();
addressSettings.setRedistributionDelay(0);
addressSettings.setDefaultAddressRoutingType(RoutingType.MULTICAST);
return addressSettings;
}

private CoreAddressConfiguration createAddressConfiguration(String TOPIC) {
CoreAddressConfiguration coreAddressConfiguration = new CoreAddressConfiguration();
coreAddressConfiguration.addRoutingType(RoutingType.MULTICAST);
coreAddressConfiguration.setName(TOPIC);
return coreAddressConfiguration;
}

private WildcardConfiguration createWildCardConfiguration() {
WildcardConfiguration wildcardConfiguration = new WildcardConfiguration();
wildcardConfiguration.setAnyWords('#');
wildcardConfiguration.setDelimiter('/');
wildcardConfiguration.setRoutingEnabled(true);
wildcardConfiguration.setSingleWord('+');
return wildcardConfiguration;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -483,17 +483,8 @@ public void useSameClientIdAndAnycastSubscribeRemoteQueueWildCard() throws Excep
pubConnection.publish("anycast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false);
pubConnection.publish("anycast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false);

Message message1 = subConnection2.receive(5, TimeUnit.SECONDS);
message1.ack();
Message message2 = subConnection2.receive(5, TimeUnit.SECONDS);
message2.ack();
Message message3 = subConnection2.receive(5, TimeUnit.SECONDS);
message3.ack();

assertEquals(payload1, new String(message1.getPayload()));
assertEquals(payload2, new String(message2.getPayload()));
assertEquals(payload3, new String(message3.getPayload()));

// pub queue gets auto created, the routing type set on the message to reflect that and the
// message does not get routed to the sub queue that has anycast
subConnection2.unsubscribe(new String[]{ANYCAST_TOPIC});

waitForBindings(0, ANYCAST_TOPIC, 1, 0, true);
Expand All @@ -506,13 +497,6 @@ public void useSameClientIdAndAnycastSubscribeRemoteQueueWildCard() throws Excep
pubConnection.publish("anycast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false);
pubConnection.publish("anycast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false);

Message message11 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message11);
Message message21 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message21);
Message message31 = subConnection2.receive(100, TimeUnit.MILLISECONDS);
assertNull(message31);

} finally {
String[] topics = new String[]{ANYCAST_TOPIC};
if (subConnection1 != null && subConnection1.isConnected()) {
Expand Down Expand Up @@ -576,20 +560,8 @@ public void useDiffClientIdAndAnycastSubscribeRemoteQueueWildCard() throws Excep
connection1.publish("anycast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false);


Message message1 = connection1.receive(5, TimeUnit.SECONDS);
assertNotNull(message1);
message1.ack();
Message message2 = connection2.receive(5, TimeUnit.SECONDS);
assertNotNull(message2);
message2.ack();
Message message3 = connection1.receive(5, TimeUnit.SECONDS);
assertNotNull(message3);
message3.ack();

assertEquals(payload1, new String(message1.getPayload()));
assertEquals(payload2, new String(message2.getPayload()));
assertEquals(payload3, new String(message3.getPayload()));

// the pub queue is auto created and the message multicast routing type won't match the anycast sub queue
// so nothing gets routed to this queue

connection2.unsubscribe(new String[]{ANYCAST_TOPIC});

Expand All @@ -603,24 +575,6 @@ public void useDiffClientIdAndAnycastSubscribeRemoteQueueWildCard() throws Excep
connection1.publish("anycast/test/1/some/la", payload2.getBytes(), QoS.AT_MOST_ONCE, false);
connection1.publish("anycast/test/1/some/la", payload3.getBytes(), QoS.AT_MOST_ONCE, false);

Message message11 = connection1.receive(5, TimeUnit.SECONDS);
assertNotNull(message11);
message11.ack();
Message message21 = connection1.receive(5, TimeUnit.SECONDS);
assertNotNull(message21);
message21.ack();
Message message31 = connection1.receive(5, TimeUnit.SECONDS);
assertNotNull(message31);
message31.ack();


String message11String = new String(message11.getPayload());
String message21String = new String(message21.getPayload());
String message31String = new String(message31.getPayload());
assertTrue(payload1.equals(message11String) || payload1.equals(message21String) || payload1.equals(message31String) );
assertTrue(payload2.equals(message11String) || payload2.equals(message21String) || payload2.equals(message31String) );
assertTrue(payload3.equals(message11String) || payload3.equals(message21String) || payload3.equals(message31String) );

} finally {
String[] topics = new String[]{ANYCAST_TOPIC};
if (connection1 != null && connection1.isConnected()) {
Expand Down

0 comments on commit 00cae02

Please sign in to comment.